Redis from scratch: finishing the client
We are ready to finish the rest of the client library and write a sample program to test it.
For simplicity sake, the client library will support only a small number of the most basic commands and the most basic data types, and without any of the options (such as the NX
option for SET
):
SET key val
where both key and val are UTF-8 stringsGET key
where the key is UTF-8 stringDEL key
where the key is UTF-8 string
Hence, we have the top layer abstraction that will be published to the users of the client library:
pub struct Client {
// ... the internal state ...
}
impl Client {
/// For connecting to a Redis server
pub async fn connect<T: ToSocketAddrs>(addr: T) -> MyResult<Self> { todo!(); }
/// For sending a SET command and receiving the response
pub async fn set(&mut self, key: &str, val: &str) -> MyResult<()> { todo!(); }
/// For sending a GET command and receiving the response
pub async fn get(&mut self, key: &str) -> MyResult<Option<Bytes>> { todo!(); }
/// For sending a DEL command and, well, you know the rest
pub async fn del(&mut self, key: &str) -> MyResult<Option<i64>> { todo!(); }
}
Connecting to server
When calling Client::connect
, the client initiates a TCP connection to the address specified by opening a TCP socket that will then be used for reading and writing bytes. For more details regarding using tokio
’s TCP APIs, see this post
impl Client {
/// Connect to the server specified at the input address, or return any
/// error while attempting the connect
pub async fn connect<T: ToSocketAddrs>(addr: T) -> MyResult<Self> {
let socket = TcpStream::connect(addr).await?;
return Ok(Self {
connection: Connection::new(socket),
});
}
}
Layers of abstraction
Between a Redis command and the raw bytes that are transmitted in the network, two layers of abstraction help make the code more readable: Connection
, Frame
and Command
(there is also Bytes
for buffered I/O on raw bytes Vec<u8>
but we will use the ready made crate bytes::Bytes
so we will discuss that in this post).
Between frame and socket
Connection
bridges the gap between Frame
and the network socket. It wraps around the TCP socket and provides two methods for parsing Frame
from the socket and for writing Frame
to the socket.
Writing Frame
to socket is trivial: serialize the frame to a byte array, then write the entire byte array to the socket. Parsing Frame
from the socket is a bit more involved, and the logic here largely imitates the implementation in the actual mini-redis here, which is to keep reading from the socket until either the TCP connection is dropped or a valid frame can be parsed:
/// A wrapper around a TCP socket (TcpStream) for writing byte stream into
/// Bytes and for parsing Bytes into frames
struct Connection {
socket: TcpStream,
}
impl Connection {
/// Instantiate a new connection
fn new(socket: TcpStream) -> Self {
return Self { socket };
}
/// Read bytes from the TcpStream, then parse it. If there is a valid
/// Frame in the bytes read, then return it. Else return None.
async fn read_frame(&mut self) -> Result<Option<Frame>, Box<dyn Error>> {
let mut buf = BytesMut::with_capacity(4096);
loop {
// TODO: unnecessary copy but oh well
if let Some(frame) = Frame::parse(&mut Bytes::from(buf.to_vec())) {
return Ok(Some(frame));
}
self.socket.readable().await?;
let nbytes = self.socket.read_buf(&mut buf).await?;
if nbytes == 0 {
return Ok(None);
}
}
}
/// Convert the input frame into bytes, then write into the socket
async fn write_frame(&mut self, frame: &Frame) -> Result<usize, Box<dyn Error>> {
self.socket.writable().await?;
let nbytes = self.socket.write(&frame.serialize()).await?;
return Ok(nbytes);
}
}
On the other hand, Frame
is an abstraction over Bytes
and the details, including how frames serialize into bytes and how frames are parsed from bytes, are discussed in this post.
Commands
Command
is an abstraction over Frame
to further assist with the readability of the code. The enum defines the possible commands including their arguments, and the various methods translate them to and from raw Frame
:
#[derive(Debug, Clone, Eq, PartialEq)]
enum Command {
Set { key: Bytes, val: Bytes },
Get { key: Bytes },
Del { key: Bytes },
}
impl Command {
/// Create a new Set command
fn set(key: Bytes, val: Bytes) -> Self {
return Self::Set { key, val };
}
/// Create a new Get command
fn get(key: Bytes) -> Self {
return Self::Get { key };
}
/// Create a new Pop command
fn del(key: Bytes) -> Self {
return Self::Del { key };
}
/// Convert a command into the appropriate Frame
fn to_frame(&self) -> Frame {
return match self {
Self::Set { key, val } => Frame::Array(vec![
Frame::Bulk(Bytes::from("SET")),
Frame::Bulk(Bytes::copy_from_slice(key)),
Frame::Bulk(Bytes::copy_from_slice(val)),
]),
Self::Get { key } => Frame::Array(vec![
Frame::Bulk(Bytes::from("GET")),
Frame::Bulk(Bytes::copy_from_slice(key)),
]),
Self::Del { key } => Frame::Array(vec![
Frame::Bulk(Bytes::from("DEL")),
Frame::Bulk(Bytes::copy_from_slice(key)),
]),
};
}
/// Parse a frame back into a command. If the frame does not correspond to
/// any of the supported commands, return None
///
/// If the frame does not strictly conform to the expected format of the
/// command, this method will return None. For example, if the input frame
/// has more than three elements, then it will never be parsed into a SET
/// command even if the first three elements form a valid SET command.
fn parse_command(frame: &Frame) -> Option<Self> {
if let Frame::Array(frames) = frame {
match frames.get(0) {
Some(Frame::Bulk(bytes)) if bytes == &Bytes::from("SET") => {
if frames.len() != 3 {
return None;
}
// unwrapping is ok because the length is already guaranteed
let key = frames.get(1).unwrap();
let val = frames.get(2).unwrap();
if let Frame::Bulk(key) = key {
if let Frame::Bulk(val) = val {
return Some(Self::set(
Bytes::copy_from_slice(key),
Bytes::copy_from_slice(val),
));
}
}
return None;
}
Some(Frame::Bulk(bytes)) if bytes == &Bytes::from("GET") => {
if frames.len() != 2 {
return None;
}
let key = frames.get(1).unwrap();
if let Frame::Bulk(key) = key {
return Some(Self::get(Bytes::copy_from_slice(&key)));
}
return None;
}
Some(Frame::Bulk(bytes)) if bytes == &Bytes::from("DEL") => {
if frames.len() != 2 {
return None;
}
let key = frames.get(1).unwrap();
if let Frame::Bulk(key_bytes) = key {
return Some(Self::del(Bytes::copy_from_slice(&key_bytes)));
}
return None;
}
_ => {
return None;
}
}
}
// For now, all commands must be arrays
return None;
}
}
A sample program
The client library is now complete (at least within our meager scope of three commands and with no performance requirements to begin with). We can test it by running a Redis server (easily done with containers) and writing a simple program to interface with the server:
//! the sample program
//!
//! to test it, first run an instance of Redis server, such as with containers:
//! docker run --rm -p 6379:6379 redis:latest
use crate::Client;
#[tokio::main]
async fn main() {
let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
client.set("foo", "bar").await.unwrap();
println!("Set 'foo' to 'bar'");
println!("Get foo: {:?}", client.get("foo").await.unwrap());
client.set("foo", "baz").await.unwrap();
println!("Set 'foo' to 'baz'");
println!("Get foo: {:?}", client.get("foo").await.unwrap());
println!("{:?} keys deleted", client.del("foo").await.unwrap());
println!("Get foo: {:?}", client.get("foo").await.unwrap());
}
The program should return the following:
Set 'foo' to 'bar'
Get foo: Some(b"bar")
Set 'foo' to 'baz'
Get foo: Some(b"baz")
Some(1) keys deleted
Get foo: None