MailboxProcessor maintains an internal message queue, where multiple producers can post messages using various
Post method variants. These messages are then retrieved and processed by a single consumer (unless you implement it otherwise) using
Scan variants. By default both producing and consuming the messages is thread-safe.
By default there is no provided error handling. If an uncaught exception is thrown inside the processor's body, the body function will end, all messages in the queue will be lost, no more messages can be posted and the reply channel (if available) will get an exception instead of a response. You have to provide all error handling yourself in case this behavior does not suit your use case.
Basic Hello World
Let's first create a simple "Hello world!"
MailboxProcessor which processes one type of message and prints greetings.
You'll need the message type. It can be anything, but Discriminated Unions are a natural choice here as they list all the possible cases on one place and you can easily use pattern matching when processing them.
Now define the processor itself. This can be done with
MailboxProcessor<'message>.Start static method which returns a started processor ready to do its job. You can also use the constructor, but then you need to make sure to start the processor later.
The parameter to
Start is a function which takes a reference to the
MailboxProcessor itself (which doesn't exist yet as you are just creating it, but will be available once the function executes). That gives you access to its various
Scan methods to access the messages from the mailbox. Inside this function, you can do whatever processing you need, but a usual approach is an infinite loop that reads the messages one by one and calls itself after each one.
Now the processor is ready, but it doesn't to anything! Why? You need to send it a message to process. This is done with the
Post method variants - let's use the most basic, fire-and-forget one.
This puts a message to
processor's internal queue, the mailbox, and immediately returns so that the calling code can continue. Once the processor retrieves the message, it will process it, but that will be done asynchronously to posting it, and it will be most likely done on a separate thread.
Very soon afterwards you should see the message
"Hi, Alice! This is mailbox processor's inner loop!" printed to the output and you're ready for more complicated samples.
Mutable State Management
Mailbox processors can be used to manage mutable state in a transparent and thread-safe way. Let's build a simple counter.
Now let's generate some operations
And you will see the following log
Since mailbox processor processes the messages one by one and there is no interleaving, you can also produce the messages from multiple threads and you will not see the the typical problems of lost or duplicated operations. There is no way for a message to use the old state of other messages, unless you specifically implement the processor so.
All messages are posted from different threads. The order in which messages are posted to the mailbox is not deterministic, so the order of processing them is not deterministic, but since the overall number of increments and decrements is balanced, you will see the final state being 0, no matter in what order and from which threads the messages were sent.
True mutable state
In the previous example we've only simulated mutable state by passing the recursive loop parameter, but mailbox processor has all these properties even for a truly mutable state. This is important when you maintain large state and immutability is impractical for performance reasons.
We can rewrite our counter to the following implementation
Even though this would definitely not be thread safe if the counter state was modified directly from multiple threads, you can see by using the parallel message Posts from previous section that mailbox processor processes the messages one after another with no interleaving, so each message uses the most current value.
Out-of-Order Message Processing
You can use
TryScan methods to look for specific messages in the queue and process them regardless of how many messages are before them. Both methods look at the messages in the queue in the order they arrived and will look for a specified message (up until optional timeout). In case there is no such message,
TryScan will return None, while
Scan will keep waiting until such message arrives or the operation times out.
Let's see it in practice. We want the processor to process
RegularOperations when it can, but whenever there is a
PriorityOperation, it should be processed as soon as possible, no matter how many other
RegularOperations are in the queue.
You can asynchronously return a value for each processed message if you send an
AsyncReplyChannel<'a> as part of the message.
Then the mailbox processor can use this channel when processing the message to send a value back to the caller.
Now to create a message, you need the
AsyncReplyChannel<'a> - what is is and how do you create a working instance? The best way is to let MailboxProcessor provide it for you and extract the response to a more common
Async<'a>. This can be done by using for example the
PostAndAsynReply method, where you don't post the complete message, but instead a function of type (in our case)
AsyncReplyChannel<OutputData> -> MessageWithResponse:
This will post the message in a queue and await the reply, which will arrive once the processor gets to this message and replies using the channel.
There is also a synchronous variant
PostAndReply which blocks the calling thread until the processor replies.