Redis协议

介绍

在工作中对于redis的使用还是比较多的,因为它的强大,而redis的通信协议也是比较简洁的

redis新版统一请求协议在redis1.2版本正式的引用,并最终在redis2.0版本正式成为redis服务器通信的标准方式,你可以依照这个协议来自已实现一个redis客户端来和redis服务器交互
redis通信协议的格式

请求

*<参数数量>CR LF
$<参数1的字节数量>CR LF
<参数1的数据><>CR LF
...
$<参数N的字节数量>CR LF
<参数N的数据>CR LF

CR LF就是\r\n符号      

🌰以下就是一个命令的打印版本

*3
$3
SET
$5
mykey
$7
myvalue

这个是因为\r\n符号所以换行了
实际的协议是这样的

"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

回复

Redis命令会返回多种不同类型的回复

通过检查服务器发回数据的第一个字节,就可以确定这个回复是什么类型

第一个字节是+, “+OK\r\n”

第一个字节为-, “-ERR unknown command ‘foobar’”

第一个字节为:, “:0\r\n”

第一个字节为$,然后接下来跟着的是表示实际回复长度的数字值,之后跟CRLF,然后跟实际回复数据,最末尾会加上CRLF

第一个字节为*,然后是回复的条数,然后是具体数据

使用Golang实现redis客户端

了解了具体的协议后我们来使用golang写一个简单的redis客户端(就不处理返回了)

package main

import (
	"bytes"
	"fmt"
	"log"
	"net"
	"os"
	"strconv"
)

// 生成redis请求协议格式的数据
func genReq(args ...string) []byte {
	var buf bytes.Buffer

	buf.WriteString("*" + strconv.Itoa(len(args)) + "\r\n")

	for _, item := range args {
		buf.WriteString("$" + strconv.Itoa(len(item)) + "\r\n")
		buf.WriteString(item + "\r\n")
	}
	return buf.Bytes()
}

func main() {
	c, err := net.Dial("tcp", "127.0.0.1:6379")
	if err != nil {
		log.Fatal(err)
	}
	req := genReq(os.Args[1:]...)
    // 发送数据
	_, err = c.Write(req)
	if err != nil {
		log.Fatal(err)
	}

	// 读取数据
	output  := make([]byte,1024)
	n, err := c.Read(output)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("output:\n%s\n", output[:n])
	//...解析值省略
}

然后就可以根据redis的回复格式来解析不同的返回值

然后顺便了解下https://github.com/go-redis/redis这个强大的redis客户端

	r := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
	fmt.Println(r.Set("key", "value", 0).Err())

这个库也非常的简单,这样就可以对一个key进行操作了,但是如果value值是自定义的类型,那就就有可能报错

redis: can't marshal define.TaskStatus (implement encoding.BinaryMarshaler)

因为没有实现这个接口,所以不能序列化,实现这个接口后就不会报错了

来看下这个库的发送流程
先是初始化客户端

r := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})

// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {
	opt.init() // 初始化配置

	c := Client{
		baseClient: newBaseClient(opt, newConnPool(opt)), // 初始化客户端
		ctx:        context.Background(),
	}
	c.cmdable = c.Process // 命令处理函数

	return &c
}

然后看一下这个库如何创建先来创建连接池的函数newConnPool(opt)

func newConnPool(opt *Options) *pool.ConnPool {
	return pool.NewConnPool(&pool.Options{ // 创建连接池
		Dialer: func(ctx context.Context) (net.Conn, error) {
            // 建立连接
			return opt.Dialer(ctx, opt.Network, opt.Addr)
		},
		PoolSize:           opt.PoolSize, // 大小
		MinIdleConns:       opt.MinIdleConns, 
		MaxConnAge:         opt.MaxConnAge,
		PoolTimeout:        opt.PoolTimeout,
		IdleTimeout:        opt.IdleTimeout,
		IdleCheckFrequency: opt.IdleCheckFrequency,
	})
}

然后接着看pool.NewConnPool这个真正创建连接池的函数,可以看到先是做了一些默认参数的初始化,然后开始执行检查空闲连接的函数,

func NewConnPool(opt *Options) *ConnPool {
	p := &ConnPool{
		opt: opt,

		queue:     make(chan struct{}, opt.PoolSize),
		conns:     make([]*Conn, 0, opt.PoolSize),
		idleConns: make([]*Conn, 0, opt.PoolSize),
		closedCh:  make(chan struct{}),
	}
	// 检查空闲连接
	p.checkMinIdleConns()

	if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
		go p.reaper(opt.IdleCheckFrequency)
	}

	return p
}


checkMinIdleConns这个函数是检查空闲连接如果当前空闲连接少的话,就创建一个新的连接后放到连接池

func (p *ConnPool) checkMinIdleConns() {
	if p.opt.MinIdleConns == 0 {
		return
	}
	for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
		p.poolSize++
		p.idleConnsLen++
		go func() {
            // 创建新的连接
			err := p.addIdleConn()
			if err != nil {
				p.connsMu.Lock()
				p.poolSize--
				p.idleConnsLen--
				p.connsMu.Unlock()
			}
		}()
	}
}

接下来看下addIdleConn这个函数


func (p *ConnPool) addIdleConn() error {
    // 创建连接
	cn, err := p.dialConn(context.TODO(), true)
	if err != nil {
		return err
	}

	p.connsMu.Lock()
	p.conns = append(p.conns, cn)
	p.idleConns = append(p.idleConns, cn)
	p.connsMu.Unlock()
	return nil
}

func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}

	if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
		return nil, p.getLastDialError()
	}
	// 创建新的连接
	netConn, err := p.opt.Dialer(ctx)
	if err != nil {
		p.setLastDialError(err)
		if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
			go p.tryDial()
		}
		return nil, err
	}
	// 将这个连接做一些参数初始化
	cn := NewConn(netConn)
	cn.pooled = pooled
	return cn, nil
}

func NewConn(netConn net.Conn) *Conn {
	cn := &Conn{
		netConn:   netConn,
		createdAt: time.Now(),
	}
    // 将连接放入bufio中
	cn.rd = proto.NewReader(netConn)
	cn.wr = proto.NewWriter(netConn)
	cn.SetUsedAt(time.Now())
	return cn
}


func NewReader(rd io.Reader) *Reader {
	return &Reader{
        // 使用bufio包装连接
        // 后续会使用rd.Write来给写入协议参数
        // 然后调度Flush就会讲协议的所有数据写入这个连接中
		rd:   bufio.NewReader(rd),
		_buf: make([]byte, 64),
	}
}

这就是一个连接的初始化过程,连接创建好后就会被调用直到关闭。
接下来还有一个比较重要的函数就是c.cmdable = c.Process这个函数,用来生成redis协议格式的数据的函数,

func (c *Client) Process(cmd Cmder) error {
	return c.ProcessContext(c.ctx, cmd)
}

func (c *Client) ProcessContext(ctx context.Context, cmd Cmder) error {
    // 这个redis库可以做一些自定义一些hook的函数,并分为,命令运行前,命令运行后执行
	return c.hooks.process(ctx, cmd, c.baseClient.process)
}

// 
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
	err := c._process(ctx, cmd)
	if err != nil {
		cmd.SetErr(err)
		return err
	}
	return nil
}


func (c *baseClient) _process(ctx context.Context, cmd Cmder) error {
	var lastErr error
    // 最多尝试
	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
		if attempt > 0 {
			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
				return err
			}
		}

		retryTimeout := true
        // withConn接收个函数,函数有两个参数,ctx和pool.Conn连接
		lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
            // WithWriter将会把生成的redis协议数据写入这个初始化的连接中
			err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
				return writeCmd(wr, cmd) // 这个函数主要是用来生成reids协议数据
			})
			if err != nil {
				return err
			}

			err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
			if err != nil {
				retryTimeout = cmd.readTimeout() == nil
				return err
			}

			return nil
		})
		if lastErr == nil || !isRetryableError(lastErr, retryTimeout) {
			return lastErr
		}
	}
	return lastErr
}


看看withConn这个函数,就是创建一个连接然后执行传入的函数也就是WithWriter,然后释放连接

func (c *baseClient) withConn(
	ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
	cn, err := c.getConn(ctx)
	if err != nil {
		return err
	}
	defer func() {
		c.releaseConn(cn, err)
	}()

	err = fn(ctx, cn)
	return err
}

然后看看WithWriter这个函数

func (cn *Conn) WithWriter(
	ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
) error {
    // 设置断开时间
	err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout))
	if err != nil {
		return err
	}

	if cn.wr.Buffered() > 0 {
		cn.wr.Reset(cn.netConn)
	}
	// 执行传入的参数,也就是生成redis协议数据的函数writeCmd
	err = fn(cn.wr)
	if err != nil {
		return err
	}
	// Flush会将数据写入连接中,初始化连接的时候,最后将连接传入到了bufio里,
    // 这是bufio的一个对象
    // 然后就会发送给redis服务器
	return cn.wr.Flush()
}

最后看下生成redis协议的函数writeCmd

func writeCmd(wr *proto.Writer, cmd Cmder) error {
	return wr.WriteArgs(cmd.Args())
}

func (w *Writer) WriteArgs(args []interface{}) error {
    // 写入请求协议前缀*
	err := w.wr.WriteByte(ArrayReply)
	if err != nil {
		return err
	}
	// 写入参数长度
	err = w.writeLen(len(args))
	if err != nil {
		return err
	}
	// 循环每一个参数检查并写入	
	for _, arg := range args {
		err := w.writeArg(arg)
		if err != nil {
			return err
		}
	}

	return nil
}

// 检查参数
// 从这个函数可以看出,传入的参数必须是这里面的类型
// 如果是自定义的类型就必须实现encoding.BinaryMarshaler这个接口,否则就会报错
func (w *Writer) writeArg(v interface{}) error {
	switch v := v.(type) {
	case nil:
		return w.string("")
	case string:
		return w.string(v)
	case []byte:
		return w.bytes(v)
	case int:
		return w.int(int64(v))
	case int8:
		return w.int(int64(v))
	case int16:
		return w.int(int64(v))
	case int32:
		return w.int(int64(v))
	case int64:
		return w.int(v)
	case uint:
		return w.uint(uint64(v))
	case uint8:
		return w.uint(uint64(v))
	case uint16:
		return w.uint(uint64(v))
	case uint32:
		return w.uint(uint64(v))
	case uint64:
		return w.uint(v)
	case float32:
		return w.float(float64(v))
	case float64:
		return w.float(v)
	case bool:
		if v {
			return w.int(1)
		}
		return w.int(0)
	case time.Time:
		return w.string(v.Format(time.RFC3339))
	case encoding.BinaryMarshaler:
		b, err := v.MarshalBinary()
		if err != nil {
			return err
		}
		return w.bytes(b)
	default:
		return fmt.Errorf(
			"redis: can't marshal %T (implement encoding.BinaryMarshaler)", v)
	}
}

// 最终都会使用这个函数简化数据写入bufio的buf中
func (w *Writer) bytes(b []byte) error {
	err := w.wr.WriteByte(StringReply)
	if err != nil {
		return err
	}

	err = w.writeLen(len(b))
	if err != nil {
		return err
	}

	_, err = w.wr.Write(b)
	if err != nil {
		return err
	}

	return w.crlf()
}

到了这里就基本过完了这个库的连接、请求过程,算是一个大致的了解学习