Here’s a small note from university time way back in 2000, which details a method by Roger Wilcock, author of the RoPS PostScript interpreter, on how to draw Bézier curves by recursive subdivision (de Casteljau construction). It uses an approximation of the curve’s flatness to decide when to subdivide further.
Throttling messages in Akka 2
Background
This blog post was written as a contribution to the Akka Summer of Blog series on the Akka Team Blog http://letitcrash.com. The code from the post is available on Github, see https://github.com/hbf/akka-throttler.
The goal
Our goal is to implement a message throttler, a piece of code that ensures that messages are not sent out at too high a rate.
Let’s see a concrete example. Suppose you are writing an application that makes HTTP requests to an external web service and that this web service has a restriction in place: you may not make more than 10 requests in 1 minute. You will get blocked or need to pay if you don’t stay under this limit. With a throttler, you can ensure that the requests you make do not overstep the threshold rate.
In the following we will change the nomenclature a bit and speak of messages only – even if in a concrete applications the messages might actually be any kind of events, like the HTTP requests in the above example.
Akka
We will write a throttler for the Akka framework in the Scala language. Akka is a library that provides a programmer with actors, which are a very useful abstraction to build powerful concurrent and distributed applications. In case you do not know about actors, here’s a mini-micro-short intro to it. (If you know about Akka, feel free to jump to the next section.)
Actors, and clerks
An actor is an object that has a message inbox and can process the messages from its inbox. Think of an office clerk with a stacking tray in front of himself. You can put messages into the stacking tray and the clerk will pull the next message out of the tray and process it, one after the other. The clerk has a sense of fairness and processes the messages in first-in-first-out order. He also has a sense of laziness and just sleeps when the stacking tray is empty.
To avoid chaos, there are two rules:
- Rule 1: Each actor processes the messages from its inbox sequentially (not concurrently). So a clerk never picks two or more messages and works them side-by-side.
- Rule 2: If an actor sends to another actor two messages in a particular order then the receiving actor will receive them in the same order. So clerks can outsource work to their colleagues but again, they must take one message after the other out of the inbox and transfer them one after the other to the colleague.
Obviously, a single actor (a single clerk) will not give you a system where work is done in parallel. In an actor system, concurrency (i.e., parallelism) is achieved by having many actors. Notice that there are no rules specifying which actor runs when. This is left to the underlying system. All we know is that when an actor processes a certain message, that message is the only one from that actor’s inbox that is being processed (Rule 1 above). This is a good thing because it means that when we program an actor (i.e., specify how it processes its messages), we do not need to care about locks, synchronization, deadlocks, etc. – something that we would normally have to do when writing parallel programs. Very nice.
There’s a catch, though. Above we said that when an actor processes a certain message, Rule 1 guarantees that this message is the only one from the actor’s inbox that is being processed. What happens if another actor were processing the same message and happened to change it? Bang! We’re back in problem land. We would have to write thread-safe messages, use synchronization means, etc. This we won’t do, however. Instead, we simply stick to the following rule:
- Rule 3: All messages sent to actors must be immutable objects.
Basically, an actor system is this: a set of actors (clerks) that work with the messages according to Rules 1-3.
A very simple example
Here’s a “Hello World” example in Scala that features a single actor, helloActor of type HelloActor, to which two messages of type String are sent:
import akka.actor.{Actor, ActorSystem, Props} class HelloActor extends Actor { def receive = { case "hello" => println("hello!") case _ => println("huh?") } } object Main extends App { val system = ActorSystem("HelloSystem") val helloActor = system.actorOf(Props[HelloActor]) helloActor ! "hello"; helloActor ! "buenos dias"; }
The example will print out two lines, “hello” and “huh?”. My short intro has not covered the technical details of creating actors, the !-syntax for calling them, etc. If you want to read more about this, please refer to the Akka documentation.
The goal in code
Now that we’ve seen the basics of Akka, let’s reformulate our goal using some sample code for Akka. We want to build a throttler – we will call our implementation TimerBasedThrottler – that behaves as follows:
// A simple actor that prints whatever it receives val printer = system.actorOf(Props(new Actor { def receive = { case x => println(x) } })) // The throttler for this example, setting the rate val throttler = system.actorOf(Props( new TimerBasedThrottler(new Rate(3, Duration(1, TimeUnit.SECONDS)) )) // Set the target throttler ! SetTarget(Some(printer)) // These three messages will be sent to the printer immediately throttler ! Queue("1") throttler ! Queue("2") throttler ! Queue("3") // These two will wait at least until 1 second has passed throttler ! Queue("4") throttler ! Queue("5")
This code uses a class Rate and messages like this:
case class Rate(val numberOfCalls: Int, val duration: Duration) { /** * The duration in milliseconds. */ def durationInMillis(): Long = duration.toMillis } case class SetTarget(target: Option[ActorRef]) case class Queue(msg: Any)
A timer-based throttler
We will realize a very simple throttler, one that is based on a timer. Assuming the throttler’s rate is 3 msg/s like in the example above, our throttler will start a timer that triggers every second and each time will give our throttler exactly three vouchers; each voucher gives the throttler a right to deliver a message. In this way, at most 3 messages will be sent out by the throttler in each interval.
If you have children and you want to impose a limit on TV watching then this kind of throttling is most probably a concept well understood by you! Every week you provide your kid with a certain amount of “TV time” to use. If he or she uses it during the week, fine, but if not, it cannot be accumulated over weeks.
I should note here that such timer-based throttlers provide relatively weak guarantees:
- Only start times are taken into account. This may be a problem if, for example, the throttler is used to throttle requests to an external web service. If a web request takes very long on the server then the rate observed on the server may be higher.
- A timer-based throttler only makes guarantees for the intervals of its own timer. In our example, no more than 3 messages are delivered within such intervals. Other intervals on the timeline, however, may contain more calls.
The two cases are illustrated in the images below, each showing a timeline and three intervals of the timer. The message delivery times chosen by our throttler are indicated by dots, and as you can see, each interval contains at most 3 point, so our throttle works correctly. Still, there is in each example an interval (the red one) that is problematic. In the first scenario, this is because the delivery times are merely the start times of longer requests (indicated by the four bars above the timeline that start at the dots), so that the server observes four requests during the red interval. In the second scenario, the messages are centered around one of the points in time where the timer triggers, causing the red interval to contain too many messages.
For some application scenarios, the guarantees provided by a timer-based throttler might be too weak. Charles Cordingley’s blog post discusses a throttler with stronger guarantees (it solves problem 2 from above).
Finite State Machines
Our throttler will be an actor, so our first shot at implementing TimerBasedThrottler might be to subclass Akka’s Actor class and start coding. However, Akka provides a very nice abstraction on top of Actor that comes in very hand in our situation: Finite State Machines (class FSM in Akka). If you haven’t heard about this computer science concept, don’t worry, it’s rather simple. (If you know, jump to the next section.)
A finite state machine is an object that at all times is in exactly one of a pre-defined set of states. For example, if you modeled an elevator, the states could be Stopped – doors open, Stopped – doors closed, and Moving:
A finite state machine can transition from certain stages to others, and each of these transitions may have an associated action. For example, an elevator can transition from state Stopped – doors closed to Moving, but it cannot transition to Moving from Stopped – doors open. When the elevator transitions from Stopped – doors open to Stopped – doors closed, it closes the doors and enables the “Open doors” button, etc. This is an example for an action associated with a particular transition.
The implementation
Our throttler should be resource-friendly in the sense that if there are no messages to be delivered, the timer will be turned off. In this way, we will not waste CPU time when no messages are to be sent in the first place. Therefore, we will make our throttler a state machine with two states: Idle and Active; in state Idle, we will turn the timer off.
Here’s a first attempt to define when we are in state Idle and Active:
The throttler will be in state Active if and only if a target is set and at least one message needs to be delivered.
Unfortunately, this does not really work, as this actor would be too forgetful. In our example of a rate of 3 msg/s, we can have 10 messages in a second: at each of the ten points in time where a message arrives, the throttler will for a moment be in state Active, starting the timer, but a moment after message delivery, it will transition back to state Idle, deactivating the timer. To fix this, we define the states as follows:
The throttler will be in state Active if and only if a target is set, and at least one message needs to be delivered or the timer of the latest delivered message is still running. Otherwise, the throttler is in state Idle.
Here is an example illustrating how this works:
The dots on the “message arrival” level are the points in time where message arrive at the throttler; the dots on the “message delivery” level are the points in time when the throttler chooses to deliver these messages. As you can see, some of the messages get delivered right after their arrival. However, there is one message – the last in our example – that is queued up and delivered only at the start of the next timer interval when the number of vouchers is restored to 3. The third level shows the number of vouchers we have available over time. (All of this assumes a rate of 3 msg/s.)
A skeleton
To implement this, let’s start with the definition of the two states:
sealed trait State case object Idle extends State case object Active extends State
With this, we can start coding our throttler:
class TimerBasedThrottler(var rate: Rate) extends Actor with FSM[State, Data] { startWith(Idle, Data(None, rate.numberOfCalls, Q[Any]())) when(Idle) { // To do } when(Active) { // To do } onTransition { case Idle -> Active => setTimer("moreVouchers", Tick, rate.duration, true) case Active -> Idle => cancelTimer("moreVouchers") } initialize } // When active, the timer will send Tick messages to the throttler case object Tick
Let’s go through this in detail. As you can see Akka’s FSM class uses two type parameters, State and Data. State is the case class we defined above, with the two states Idle and Active. The type parameter Data is a type that represents our state machine’s internal state: the FSM trait we mix in merely takes care of managing the state itself for us – we specify it initially using startsWith and can tell the FSM to transition to other states, as will be explained shortly. If we need any additional information, we need to manage it ourselves. For this, we have two options: firstly, we can store state in our class itself. We do this with the rate, for example, which we store as a class parameter var variable. Secondly, we can put state information into the Data type. In our case, Data is this:
import scala.collection.immutable.{ Queue => Q } sealed case class Data( target: Option[ActorRef], vouchersLeft: Int, queue: Q[Any] )
Apart from the rate variable, our throttler’s state encompasses:
- a reference to the target, wrapped into an Option, so as to allow us to express that no target is set;
- the number of vouchers left in this time interval (i.e., before the next timer fires); and
- a queue of all messages that we have not yet delivered so far.
The body of class TimerBasedThrottler contains three constructs to define the behavior of our actor. First,
startWith(Idle, Data(None, rate.numberOfCalls, Q[Any]()))
tells our machine to begin in state Idle, with no target set, with the full number of vouchers available to make calls, and with no messages waiting to be delivered.
What follows are two constructs that – like startWith – are provided by the FSM trait. For each state of the FSM, we have a when-clause in which we will specify how the actor responds to Akka messages that it receives when it is in this particular state. As we will see, the when-clause offers a nice syntax to tell the machine to which state to transition, if a state change is desired, and to redefine its internal state. Finally, there is an onTransition-clause in which we specify the action that is to take place when the machine transitions from one state to another:
onTransition { case Idle -> Active => setTimer("moreVouchers", Tick, rate.duration, repeat=true) case Active -> Idle => cancelTimer("moreVouchers") }
We distinguished between states Idle and Active in order to be able to turn the timer off. We do this here when going from Active to Idle. When we enter state Active, we start the timer, instructing the underlying FSM trait to send to the throttler a Tick message whenever the timer triggers.
Note: Our TimerBasedActor does not have code to support changing the rate (the version on Github has this). Still, we expose the variable rate of the class publicly. This is not great (but not critical because Akka actors are always “hidden” behind ActorRef‘s).
Delivering messages
Before coding the when-clauses, let’s settle how we deliver messages. Here comes a small helper that takes our throttler’s internal state (d in the code below) and sends as many messages as we have vouchers for. It then returns an appropriately altered copy of d that contains the remaining messages to be delivered and records the number of vouchers left for this timer period.
import scala.math.min private def deliverMessages(d: Data): Data = { val nrOfMsgToSend = min(d.queue.length, d.vouchersLeft) // Tell sends the message, like ! in our Akka intro d.queue.take(nrOfMsgToSend).foreach(d.target.get.tell) d.data.copy(queue = d.queue.drop(nrOfMsgToSend), vouchersLeft = d.vouchersLeft - nrOfMsgToSend ) }
Notice that this version simply sends the message to the target. This has the drawback that the target will see the throttler as the sender of the message. In most use cases, it is probably preferable for the throttler to be transparent in the sense that the target will not see the throttler as the sender but instead the actor that sent the message to the throttler in the first place. The implementation you find on Github does exactly this.
The Idle state
What must our throttler do when it is sitting in Idle state and a SetTarget(t) or Queue(msg) message arrives?
By definition of the Idle state, we know that in this state, we have no target, or there are no messages to be sent and the timer for the last message delivered (if any) is not running anymore. Therefore:
- When a SetTarget message arrives and our queue is non-empty, we can start sending messages.
- When a SetTarget message arrives and our queue is empty, we stay, recording the new target.
- When a Queue message arrives and the target is not set, we stay in Idle, after adding the new message to our queue.
- When a Queue message arrives and the target is set, we jump to Active, delivering the message that we just received.
In code, this looks as follows:
when(Idle) { // Set the target case Event(SetTarget(t @ Some(_)), d) if !d.queue.isEmpty => goto(Active) using deliverMessages(d.copy(target = t)) case Event(SetTarget(t), d) => stay using d.copy(target = t) // Queuing case Event(Queue(msg), d @ Data(None, _, queue)) => stay using d.copy(queue = queue.enqueue(msg) case Event(Queue(msg), d @ Data(Some(_), _, Seq())) => goto(Active) using deliverMessages(d.copy(queue = Q(msg)) }
The Active state
The active state is similar:
- When a SetTarget message arrives to unset the target, we hop to Idle. With no target given, all we can do is queuing messages.
- When a SetTarget message arrives to change the target to another one, we update our internal state, d.
- When a Queue message arrives and we have vouchers left, we use them; when we’ve run out of vouchers, we put the message at the end of our queue.
when(Active) { // Set the target (when the new target is None) case Event(SetTarget(None), d) => goto(Idle) using d.copy(target = None) // Set the target (when the new target is not None) case Event(SetTarget(t @ Some(_)), d) => stay using d.copy(target = t) // Queue a message (when we cannot send messages in the // current period anymore) case Event(Queue(msg), d @ Data(_, 0, queue)) => stay using d.copy(queue = queue.enqueue(msg)) // Queue a message (when we can send some more messages // in the current period) case Event(Queue(msg), d @ Data(_, _, queue)) => stay using deliverMessages(d.copy(queue = queue.enqueue(msg))) // Period ends and we have no more messages case Event(Tick, d @ Data(_, _, Seq())) => goto(Idle) // Period ends and we get more occasions to send messages case Event(Tick, d @ Data(_, _, _)) => stay using deliverMessages(d.copy(vouchersLeft = rate.numberOfCalls)) }
Finally, we also have to deal with the Tick messages from the timer that will come in repeatedly. When a Tick message arrives, we check whether the queue is empty, in which case there is no more work to do and we can transition back to Idle, deactivating the timer. In case the queue is empty, we deliver as many messages as we have vouchers for.
A race condition?
Notice that in coding the TimerBasedThrottler actor, we did not have to worry about concurrency issues. For example, our actor’s state contains the number of vouchers left, a number that is changed both by the actor and a timer. Had we done this with a traditional, asynchronous timer, we would have had to synchronize access. However, by having the FSM send us a Tick message we can stop worrying, since by Rule 1, no two messages of the same actor are processed in parallel. So whenever we change the number of vouchers, we are guaranteed to be the only one doing so.
Side remark: managing the FSM’s internal state
You may have wondered while reading through the above why we made rate a class parameter and put the additional state information target, vouchersLeft, and queue into the type Data. The reason for this is the following. All state in Data can be pattern-matched upon in when(…) {…} clauses, which often allows one to express the FSM’s logic in an elegant way. State you do not need to pattern-match is a candidate for storing as member fields; you can still use such state in pattern matching through Scala guard expressions. So it is a matter of taste and judgement to choose the appropriate place to store your state. As Ronald Kuhn explained it to me, you can see when you’ve taken it too far in either direction:
- If Data has become a monster then it’s too much (or the FSM is doing too much by itself).
- If you find yourself writing guard expressions with your class variables a lot, then you’ve scattered your data too much.
Testing
Akka comes with a very useful TestKit that allow us to test the throttler. Here is a small excerpt from such a test.
"respect the rate (3 msg/s)"; in { val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor] val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second)))) throttler ! SetTarget(Some(echo)) throttler ! Queue("1") throttler ! Queue("2") throttler ! Queue("3") throttler ! Queue("4") throttler ! Queue("5") throttler ! Queue("6") throttler ! Queue("7") within(1 second) { expectMsg("1") expectMsg("2") expectMsg("3") expectNoMsg(remaining) } within(1 second) { expectMsg("4") expectMsg("5") expectMsg("6") expectNoMsg(remaining) } within(1 seconds) { expectMsg("7") } }
Refer to the test suite on Github for the details.
Acknowledgements
Thanks to Roland Kuhn for his very helpful feedback on both the code and the blog post.