CMUX 连接复用器

Cmux

Cmux 是一个用来在同一个端口中绑定多种协议的通用Golang语言库,支持grpc,ssh,https,http协议,

如何使用

// 创建监听端口
lis, err := net.Listen("tcp",":8080")
if err != nil {
    log.Fatal(err)
}
// 创建一个mux
m := cmux.New(l)

// 分别创建配置grpc,http,其他协议的matcher
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.HTTP1Fast())
trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched.

grpcS := grpc.NewServer()
grpchello.RegisterGreeterServer(grpcs, &server{})

httpS := &http.Server{
	Handler: &helloHTTP1Handler{},
}

trpcS := rpc.NewServer()
trpcS.Register(&ExampleRPCRcvr{})

// 监听服务
go grpcS.Serve(grpcL)
go httpS.Serve(httpL)
go trpcS.Accept(trpcL)

// 启动cmux
m.Serve()

代码流程分析

首先通过New方法创建一个CMux的接口,这个接口实现了五个方法

// CMux is a multiplexer for network connections.
type CMux interface {
	// 返回一个Listener,新的连接如果被匹配到会发送到对应到listener
	Match(...Matcher) net.Listener
	// MatchWithWriters returns a net.Listener that accepts only the
	// connections that matched by at least of the matcher writers.
	//
	// Prefer Matchers over MatchWriters, since the latter can write on the
	// connection before the actual handler.
	//
	// The order used to call Match determines the priority of matchers.
	MatchWithWriters(...MatchWriter) net.Listener
    // 启动监听新的连接
    // Serve是阻塞的
	Serve() error
	// 自定义处理错误
	HandleError(ErrorHandler)
	// sets a timeout for the read of matchers
	SetReadTimeout(time.Duration)
}

cMux实现了Cmux这个接口

type cMux struct {
    // New方法接收到的Listener
    root        net.Listener
    // 每个matcher的最大的连接数
    bufLen      int
    // // ErrorHandler handles an error and returns whether
    // the mux should continue serving the listener.
    // ErrorHandler 一个error然后判断是否cmux要继续监听
    errh        ErrorHandler
    // 如果连接数大于bufLen后,新来的连接会直接
    donec       chan struct{}
    // 用来保存 listenr和对应的matcher
	sls         []matchersListener
	readTimeout time.Duration
}

matcherListener

type matchersListener struct {
    // 这个切片好用来保存所有传入的Matcher
    ss []MatchWriter
    // 用来保存新建的Conn和listener 这个listenr实际上是New时传入的Listener
	l  muxListener
}

// MatchWriter is a match that can also write response (say to do handshake).
type MatchWriter func(io.Writer, io.Reader) bool

type muxListener struct {
    net.Listener
    // 使用chan来保存收到的连接,默认总数为bufLen 1024
	connc chan net.Conn
}

如何使用Match创建了一个可以带匹配协议的Listener

httpL := m.Match(cmux.HTTP1Fast())

HTTP1Fast是匹配一个请求是否为HTTP1,这个是根据请求头中是否存在常用的请求Method来快速判断的,还有一些其他的Matcher

func (m *cMux) Match(matchers ...Matcher) net.Listener {
	// matchersToMatchWriters 会将传入的Matcher转换为MatchWriter结构
	mws := matchersToMatchWriters(matchers)
	return m.MatchWithWriters(mws...)
}

// MatchWithWriters 方法会返回一个新的Listener
func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
	// muxListener 实现了net.Listener接口
	ml := muxListener{
		// Listener
		Listener: m.root,
		// 创建一个默认大小为1024 net.Conn的chan
		// 这个channel 用来保存当前Match匹配到的连接
		connc:    make(chan net.Conn, m.bufLen),
	}
	// 保存Matcher和创建的ml
	m.sls = append(m.sls, matchersListener{ss: matchers, l: ml})
	return ml
}

cmux开始接收到连接的处理


func (m *cMux) Serve() error {
	var wg sync.WaitGroup

	// 推出时的清理动作
	defer func() {
		close(m.donec)
		wg.Wait()

		for _, sl := range m.sls {
			// 关闭接收连接的channel
			close(sl.l.connc)
			// Drain the connections enqueued for the listener.
			for c := range sl.l.connc {
				_ = c.Close()
			}
		}
	}()

	for {
		// 获取新的连接
		c, err := m.root.Accept()
		if err != nil {
			// 处理连接错误
			// 先用ErrorHandler来处理错误
			// 然后检查错误是否为net.Error
			// 如果还没有退出就说是别的错误,直接返回错误
			if !m.handleErr(err) {
				return err
			}
			// 
			continue
		}
		// 开始处理新的连接
		wg.Add(1)
		go m.serve(c, m.donec, &wg)
	}
}

// 通过循环找找m.sls这个结构来查找可以匹配的Listener
func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()
	// 包装Conn
	muc := newMuxConn(c)
	if m.readTimeout > noTimeout {
		_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
	}
	// 
	for _, sl := range m.sls {
		for _, s := range sl.ss {
			// 匹配连接
			matched := s(muc.Conn, muc.startSniffing())
			if matched {
				muc.doneSniffing()
				if m.readTimeout > noTimeout {
					_ = c.SetReadDeadline(time.Time{})
				}
				select {
				// 正确匹配到了就会把新的连接发送到对应的muxListener的channel中,
				// 这个muxlistener实现了net.Listener接口
				// 默认channel的大小为1024 如果大于这个数连接就会被关闭
				case sl.l.connc <- muc:
				case <-donec:
					_ = c.Close()
				}
				return
			}
		}
	}

	// 如果没有匹配到,连接就会被关闭
	// handleErr 会检查错误如果,如果返回false就会关闭listener
	_ = c.Close()
	err := ErrNotMatched{c: c}
	if !m.handleErr(err) {
		_ = m.root.Close()
	}
}

实际的http或grpc服务如何获取连接

初始化时m.Match(cmux.HTTP1Fast())这个方法会返回一个muxListener结构,这个结构实现了net.ListenerAccept方法

func (l muxListener) Accept() (net.Conn, error) {
	c, ok := <-l.connc
	if !ok {
		return nil, ErrListenerClosed
	}
	return c, nil
}

在方法serve中当匹配到连接时,就会把这个连接发送到对应的muxListenerconnc这个channel中,然后Accept方法就可以接收到一个连接并返回,然后就到具体的http或者其他服务来处理这个连接了,到现在为止cmux处理连接的流程基本结束了,