Redis Stream Tutorial In 20 Minutes

By Alex

  • by Alex
  • ·
  • 17 Feb 2019
  • ·
  • Topics: Redis , Stream
  • ·
  • 1163 Views

There is a key new feature in redis 5: stream.

Stream is a storage structure in the log form, and you can append data into it. It will generate a timestamp ID for each data. And stream also has a convenient model for reading data.

So stream is suitable for message queues and time series storage.

2. Installation

We need to use the latest version of Redis 5.0, and here we use the docker redis container:

 docker run --name redis5 -p 6379:6379 -d redis:5.0-rc3

The redis client:

 docker run -it --link redis5:redis --rm redis redis-cli -h redis -p 6379

It'll enter the interactive command line after starting:

 redis:6379>

3. Usage

3.1 Adding elements to stream

The stream elements can be one or more key-value pairs. Let's add elements to the stream:

 redis:6379> XADD mystream * sensor-id 1234 temperature 19.8 1531989605376-0

What we can know from the above are:

  • mystream is the key of stream;
  • the parameter at the location of * is the element ID, and * indicates that an element ID is generated by the system automatically;
  • the added element contains 2 key-value pairs, sensor-id 1234 and temperature 19.8;
  • the returned value is the ID of the newly added element, consisting of a timestamp and an incrementing number.

You can also get the number of elements in the Stream:

 redis:6379> XLEN mystream (integer) 1

3.2 Range query

When we want to use a range query, we need to specify the start and end IDs, which is equivalent to giving a time range:

 redis:6379> XRANGE mystream 1531989605376 1531989605377 1) 1) 1531989605376-0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"

You can use - for the smallest ID and + for the biggest ID:

 redis:6379> XRANGE mystream - + 1) 1) 1531989605376-0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"

When there are too many elements returned, you can limit the number of returned results, which is just like the paging when querying the database and specify it by the COUNT parameter:

 redis:6379> XRANGE mystream - + COUNT 2 1) 1) 1531989605376-0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8"

You can also use the XREVRANGE command to reverse the query, and the usage is the same as XRANGE.

3.3 Listening for new elements of stream

 redis:6379> XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream"   2) 1) 1) 1531989605376-0         2) 1) "sensor-id"            2) "1234"            3) "temperature"            4) "19.8"

The mystream after STREAMS specifies the key of the target stream; 0 is the smallest ID, and we need to obtain the elements that is greater than the specified ID in the specified stream; COUNT refers to the number of the elements we want to obtain.

Multiple streams can be specified together, such as STREAMS mystream otherstream 0 0.

3.3.1 Blocking listener

If you execute the following in client 1:

 redis:6379> XREAD BLOCK 0 STREAMS mystream $

it will enter the waiting state.

And if you add elements to client 2:

 redis:6379> XADD mystream * test 1

the elements just added will be displayed in client 1:

 1) 1) "mystream"   2) 1) 1) 1531994510562-0         2) 1) "test"            2) "1"

0 is the specified timeout, so 0 means never timeout here; $ means the maximum ID in the stream.

3.4 Consumer Group

When the amount of stream is very large, or when the consumer processing is very time consuming, it'll under greater pressure if there is only one consumer. Thus redis stream provides the concept of the consumer group, allowing multiple consumers to process the same stream to implement load balancing.

For example, if there are 3 consumers C1, C2, and C3, and there are 7 message elements in the stream, then the allocation for the consumers is:

 1 -> C1 2 -> C2 3 -> C3 4 -> C1 5 -> C2 6 -> C3 7 -> C1

3.4.1 Create a Consumer Group

 redis:6379> XGROUP CREATE mystream mygroup01 $ OK

Here a consumer group is created for the stream mystream, and the name of the group is mygroup01; $ means to read the element that is after the current maximum ID.

3.4.2 Add test data

Add some new data:

 redis:6379> XADD mystream * message apple 1531999977149-0 redis:6379> XADD mystream * message orange 1531999980272-0 redis:6379> XADD mystream * message strawberry 1531999983493-0 redis:6379> XADD mystream * message apricot 1531999988458-0 redis:6379> XADD mystream * message banana 1531999991782-0

3.4.3 Read data through the consumer group

 redis:6379> XREADGROUP GROUP mygroup01 Alice COUNT 1 STREAMS mystream > 1) 1) "mystream"   2) 1) 1) 1531999977149-0         2) 1) "message"            2) "apple"

Alice is the name of the group member, and > means that the data has not been read by the members of the group so far.

As you can see, you don't need to create group members in advance, as they will be created automatically the first time they are used.

Then let's create another member to read the data:

 redis:6379> XREADGROUP GROUP mygroup01 Bob COUNT 1 STREAMS mystream > 1) 1) "mystream"   2) 1) 1) 1531999980272-0         2) 1) "message"            2) "orange"

3.4.4 Consumption history

 redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0 1) 1) "mystream"   2) 1) 1) 1531999977149-0         2) 1) "message"            2) "apple"

The last specified ID here is 0, so you can get the history data that is pending (the data that you have used, but did not send the consumption confirmation for it), and it can help with the work after recovery.

3.4.5 Consumption confirmation

 redis:6379> XACK mystream mygroup01 1531999977149-0 (integer) 1

1531999977149-0 is the data of apple consumed by Alice. Let's check Alice's consumption history now:

 redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0 1) 1) "mystream"   2) (empty list or set)

It's already empty.

3.4.6 Failure Processing

So you can get the message data that you have not confirmed yet, when there is a problem with a consumer and then you recover it. This is a security mechanism, but what if the problematic consumer can not be recovered any more? Is there any way to deal with the message data that it has not confirmed yet?

There is a solution provided in redis stream to deal with this situation:

  1. Find out all the message data that has been delivered but not confirmed;
  2. Change the owner of these data.

Now it allows a new consumer to process the data.

List the unprocessed data:

 redis:6379> XPENDING mystream mygroup01 - + 10 1) 1) 1531999980272-0   2) "Bob"   3) (integer) 45126376   4) (integer) 2 2) 1) 1531999983493-0   2) "Tom"   3) (integer) 867475   4) (integer) 1

You can see that there are 2 pieces of data unprocessed, and it has listed the ID of each piece of data, the owner, the idle time of this message (in milliseconds), and the number of times this message was delivered.

Change the owner:

 redis:6379> XCLAIM mystream mygroup01 Gates 3600 1531999980272-0 1531999983493-0 1) 1) 1531999980272-0   2) 1) "message"      2) "orange" 2) 1) 1531999983493-0   2) 1) "message"      2) "strawberry"

It passes the message specifying the 2 IDs to Gates. The 3600 refers to the minimum idle time, and it assigns the specified message of which the idle time is more than 3600 to Gates. Note that Gates is a new consumer. It has not been declared before.

Check out the data that Gates has not processed so far:

 redis:6379> XREADGROUP GROUP mygroup01 Gates STREAMS mystream 0 1) 1) "mystream"   2) 1) 1) 1531999980272-0         2) 1) "message"            2) "orange"      2) 1) 1531999983493-0         2) 1) "message"            2) "strawberry"

There are 2 pieces of data newly allocated.

3.5 View related information

View the basic information:

 redis:6379> XINFO STREAM mystream 1) length 2) (integer) 15 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1531989605376-0    2) 1) "sensor-id"       2) "1234"       3) "temperature"       4) "19.8" 11) last-entry 12) 1) 1531999991782-0    2) 1) "message"       2) "banana"

View the consumer group information:

 redis:6379> XINFO GROUPS mystream 1) 1) name   2) "mygroup"   3) consumers   4) (integer) 3   5) pending   6) (integer) 5 2) 1) name   2) "mygroup01"   3) consumers   4) (integer) 4   5) pending   6) (integer) 2

View the information about consumers in a group:

 redis:6379> XINFO CONSUMERS mystream mygroup 1) 1) name   2) "Alice"   3) pending   4) (integer) 3   5) idle   6) (integer) 2483388 2) 1) name   2) "Bob"   3) pending   4) (integer) 2   5) idle   6) (integer) 48453755 3) 1) name   2) "Gates"   3) pending   4) (integer) 0   5) idle   6) (integer) 2385114

3.7 Delete the message data

First let's check the existing data:

 redis:6379> XRANGE mystream - + COUNT 2 1) 1) 1531989605376-0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "19.8" 2) 1) 1531994510562-0   2) 1) "test"      2) "1"

Delete the first piece of data:

 redis:6379> XDEL mystream 1531989605376-0 (integer) 1

Check it again and you can find that the previous first data is gone:

 redis:6379> XRANGE mystream - + COUNT 2 1) 1) 1531994510562-0   2) 1) "test"      2) "1" 2) 1) 1531994516257-0   2) 1) "test"      2) "2"

Note: It doesn't really delete the data from memory if you use XDEL, but just marks the data. It doesn't reclaim memory.

3.8 Set the maximum length of stream

Add data and specify the maximum length as 2:

 redis:6379> XADD mystream MAXLEN 2 * value 1 1532049865028-0 redis:6379> XADD mystream MAXLEN 2 * value 2 1532049872075-0 redis:6379> XADD mystream MAXLEN 2 * value 3 1532049877554-0

We added 3 pieces of data. Now let's take a look at the length of the stream and the current content:

 redis:6379> XLEN mystream (integer) 2 redis:6379> XRANGE mystream - + 1) 1) 1532049872075-0   2) 1) "value"      2) "2" 2) 1) 1532049877554-0   2) 1) "value"      2) "3"

You can see that there are only 2 pieces of data.

4. Summary

The above are the basic operations of redis stream. You'll understand stream better after trying them yourself.