Parsing infinite streams with attoparsec


Posted on January 1, 2019 by wjwh


In a previous article, we looked at how Redis replication works and obtained a replication stream in our terminal using netcat. However, the data sent over was not very readable due to being encoded with the Redis Serialization Protocol (RESP). Since RESP is not native to most programs, parsing it will be necessary before an application can make use of the data. We’ll write a short parser with Haskell’s attoparsec library to parse the (potentially infinitely long) binary stream into a slightly more useful (but still potentially infinitely long) stream of parsed Redis values.

Parsing the Redis Serialization Protocol

As always when parsing something in Haskell, one of the most important things is deciding what data structure to parse the source text into. Luckily, the RESP documentation is extremely clear and we can pretty much type down the Haskell code straight from the text. A RedisValue can be an Integer, an Error, a SimpleString or a BulkString, or an Array containing zero or more RedisValues:

data RedisValue = RedisInteger Integer | RedisError B.ByteString | RedisSimpleString B.ByteString | RedisBulkString B.ByteString | RedisArray [RedisValue]

To make the printing later a little nicer we’ll define a Show instance for this datatype that unwraps the types and just shows their ‘contents’:

instance Show RedisValue where show (RedisArray elements) = concat $ ["[", intercalate "," (map show elements), "]"] show (RedisBulkString bs) = show bs show (RedisInteger num) = show num show (RedisError bs) = show bs show (RedisSimpleString bs) = show bs

Every type of value has a separate prefixing character that uniquely identifies it. This makes parsing extremely straightforward. For the Integer, SimpleString and Error cases we just need to check whether they start with the correct magic character and then parse in the text (for Error and SimpleString) or number (for Integer). They all end with a newline, so we parse that out as well:

parseRedisInteger = RedisInteger <$> (char ':' *> signed decimal) <* endOfLine
parseRedisSimpleString = RedisSimpleString <$> (char '+' *> AB.takeWhile (not . isEndOfLine)) <* endOfLine
parseRedisError = RedisError <$> (char '-' *> AB.takeWhile (not . isEndOfLine)) <* endOfLine

A BulkString takes a little more effort, since it is prefixed not only by a magic character, but also by the length of the string. The parser is still very straightforward:

parseRedisBulkString = do char '$' bStringLength <- decimal endOfLine actualString <- AB.take bStringLength return $ RedisBulkString actualString

An parser for a RedisArray can be slightly confusing, since it can contain multiple instances of RedisValue within itself. Luckily, attoparsec has no problem with recursive definitions. The implementation once again flows directly from the documentation:

parseRedisArray = do char '*' numElements <- decimal endOfLine elements <- count numElements parseRedisValue return $ RedisArray elements

With all these sub-parsers present, the parser for a RedisValue is simply “any of these”:

parseRedisValue :: Parser RedisValue
parseRedisValue = choice [ parseRedisArray , parseRedisBulkString , parseRedisInteger , parseRedisError , parseRedisSimpleString ]

The whole thing is only 35 lines, including some whitespace and the type definitions. Now that we have a functional parser for the Redis data, we can get to the task of actually parsing the replication stream.

Applying the parser to the replication stream

As we saw in the previous article, you can get the replication stream by sending a SYNC\n command to a Redis server. In Haskell, we will need to acquire a TCP socket, but otherwise this should be no problem. Parsing all the commands will be more tricky. We could use the many' combinator to simply parse the stream into an infinite list of RedisValues, but this would rapidly run into problems because attoparsec will keep all of the input so far in memory to allow for arbitrary backtracking. So, it makes sense to parse only one value at a time, in order to allow the GC to let go of parts of the replication stream as fast as possible.

To actually accomplish this, we will make heavy use of attoparsecs parseWith, which can be supplied with a ‘refill’ function as well as a string to be parsed. If the initial string to be parsed only contains a partial RedisValue, attoparsec will call the refill function to obtain more data to parse, until there is enough data to either complete the parse or to decide that the input data does not describe a correct RedisValue. This will be extremely useful for acquiring more input from the replication stream, especially when some of the values that should be parsed could be extremely large. The refill function is any function in some monad (usually IO) that can be called to acquire more input. In our case, we will simply try to get more data from the Socket by using the recv function. Note that we can use the empty string as the initial value, since the empty string is not enough to succeed or fail parsing and so recv will be called immediately:

parseWith (recv sock 1024) parseRedisValue ""

Since the replication stream we get from the server does not necessarily come in neat 1024-byte long blocks, and since there could be more commands per block, we may have some data left over after parsing a RedisValue. Attoparsec will return both the parsed value and any remainder of the string for further parsing. In this example case we can simply print the value to STDOUT and recurse, feeding the ‘leftover’ string into parseWith as the initial input. If the leftover string does not contain a complete RedisValue, it will automatically call recv again to obtain more input.

streamingParser :: Socket -> B.ByteString -> IO ()
streamingParser sock leftovers = do result <- parseWith (recv sock 1024) parseRedisValue leftovers case result of Done rest result -> do print result streamingParser sock rest Fail _ _ msg -> error msg

The streamingParser function will loop infinitely, reading the replication stream from the supplied socket and printing the parsed RedisValues to the standard output. In a more serious application you may want to do additional filtering and/or transforming of the value before passing it on, instead of just printing the value (in addition to avoiding print, since it will convert any bytestrings into a regular String first, which may lead to performance problems).

Hooking it all up

Now that we have a function which can parse an infinite stream of RedisValues from a socket, all we have to do is to hook it up to a socket and test it. We will just borrow some code from the documentation of Network.Socket for our client:

main = withSocketsDo $ do addr <- resolve "127.0.0.1" "6379" bracket (open addr) close talk where resolve host port = do let hints = defaultHints { addrSocketType = Stream } addr:_ <- getAddrInfo (Just hints) (Just host) (Just port) return addr open a = do sock <- socket (addrFamily a) (addrSocketType a) (addrProtocol a) connect sock $ addrAddress a return sock talk sock = do sendAll sock "SYNC\n" Done rest (RedisBulkString rdb) <- parseWith (recv sock 1024) parseRDB "" putStr $ "Skipped " ++ (show . B.length $ rdb) ++ " bytes of RDB data.\n" streamingParser sock rest

For some reason, the initial streaming of the .rdb file by the Redis master almost follows the Redis protocol specification for a RedisBulkString, but it does not include the newline at the end. I wrote a parseRDB parser for it which is exactly the same as the parseRedisBulkString parser except for the newline at the end. I have uploaded to whole thing as a gist if you are interested in running it yourself. The output looks like this (in ghci):

*RedisParsing> main
Skipped 177 bytes of RDB data.
["PING"]
["PING"]
["PING"]
["SELECT","0"]
["setex","a","5","abc"]
["DEL","a"]
["PING"]

In this example, I set up a connection to an empty Redis instance running locally on my machine. It sends over the RDB file and then starts pinging as usual. After some time I use the redis-cli tool to send a SETEX command and after 5 seconds when the key expires, the master sends out a DEL for the expired key. This is all as expected.

Conclusion

It was pretty straightforward to define some parsers for the various shapes a RedisValue can have and to apply these to the incoming replication stream. However, operating like this will still not update the INFO section of the master. It also uses the older SYNC method instead of the newer PSYNC. In a next article we will look at ways to improve these two shortcomings.