本文最后更新于:2023年8月8日 晚上
服务端
数据结构
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
type Server struct {
Addr string
// 路由处理器
Handler Handler // handler to invoke, http.DefaultServeMux if nil
TLSConfig *tls.Config
ReadTimeout time.Duration
ReadHeaderTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
MaxHeaderBytes int
TLSNextProto map[string]func(*Server, *tls.Conn, Handler)
ConnState func(net.Conn, ConnState)
ErrorLog *log.Logger
BaseContext func(net.Listener) context.Context
ConnContext func(ctx context.Context, c net.Conn) context.Context
inShutdown atomicBool // true when server is in shutdown
disableKeepAlives int32 // accessed atomically.
nextProtoOnce sync.Once // guards setupHTTP2_* init
nextProtoErr error // result of http2.ConfigureServer if used
mu sync.Mutex
listeners map[*net.Listener]struct{}
activeConn map[*conn]struct{}
doneChan chan struct{}
onShutdown []func()
listenerGroup sync.WaitGroup
}
// Handler 路由处理器的实现
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
hosts bool // whether any patterns contain hostnames
}
type muxEntry struct {
h Handler
pattern string
}
ServeMux
- m map[string]muxEntry:维护path和handler的映射关系
- es []muxEntry 用于路由前缀匹配
- muxEntry: 在handler外,冗余保存了path路径信息
handler和路由注册过程
Demo
http.HandleFunc("/ping", func(writer http.ResponseWriter, request *http.Request) {
_, _ = writer.Write([]byte("pong"))
})
HandleFunc
补充内部单例实现的ServeMux
// DefaultServeMux is the default ServeMux used by Serve.
var DefaultServeMux = &defaultServeMux // 单例
var defaultServeMux ServeMux
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}
ServeMux.HandleFunc
将demo中传入的handler方法,包装成HandlerFunc 该类型实现了上述的 ServeHTTP(w ResponseWriter, r *Request) 接口,实现逻辑是函数自身的调用
// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
if handler == nil {
panic("http: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}
type HandlerFunc func(ResponseWriter, *Request) // 实现Handler 请求处理接口
// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r) // 调用handler自身
}
ServeMux.Handle
路由及方法的注册
如果注册的路径pattern是以 / 结尾,则放入es中的合适位置,用于路径前缀匹配
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()
if pattern == "" {
panic("http: invalid pattern")
}
if handler == nil {
panic("http: nil handler")
}
if _, exist := mux.m[pattern]; exist {
panic("http: multiple registrations for " + pattern)
}
if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
if pattern[len(pattern)-1] == '/' {
mux.es = appendSorted(mux.es, e)
}
if pattern[0] != '/' {
mux.hosts = true
}
}
服务启动过程
ListenAndServe
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
(srv *Server) ListenAndServe()
调用net包,监听端口
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
(srv *Server) Serve
func (srv *Server) Serve(l net.Listener) error {
// ...
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept() // 调用tcp 的accept方法
// ...
c := srv.newConn(rw)
// ...
go c.serve(connCtx)
}
}
- l.Accept():在for循环中,会使用net包的tcp accept方法,原理是epoll多路复用技术Linux ,不会持续的自旋占用cpu时间片,而是有请求到来时,才由内核精准唤醒
- newConn : 封装http请求
- go c.serve(connCtx): 使用goroutine处理请求
(c *conn) serve
func (c *conn) serve(ctx context.Context) {
// ...
serverHandler{c.server}.ServeHTTP(w, w.req) // 调用ServeHTTP方法
//...
}
sh serverHandler) ServeHTTP
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux // 不传会补充单例的DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
if req.URL != nil && strings.Contains(req.URL.RawQuery, ";") {
var allowQuerySemicolonsInUse int32
req = req.WithContext(context.WithValue(req.Context(), silenceSemWarnContextKey, func() {
atomic.StoreInt32(&allowQuerySemicolonsInUse, 1)
}))
defer func() {
if atomic.LoadInt32(&allowQuerySemicolonsInUse) == 0 {
sh.srv.logf("http: URL query contains semicolon, which is no longer a supported separator; parts of the query may be stripped when parsed; see golang.org/issue/25192")
}
}()
}
handler.ServeHTTP(rw, req) // handler是ServeMux,所以会走到ServeMux的实现
}
(mux *ServeMux) ServeHTTP
- 找到路径对应的hanlder
- 调用该handler
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r) // 这里👇
h.ServeHTTP(w, r)
}
(mux *ServeMux) Handler(
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
// CONNECT requests are not canonicalized.
if r.Method == "CONNECT" {
// If r.URL.Path is /tree and its handler is not registered,
// the /tree -> /tree/ redirect applies to CONNECT requests
// but the path canonicalization does not.
if u, ok := mux.redirectToPathSlash(r.URL.Host, r.URL.Path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}
return mux.handler(r.Host, r.URL.Path)
}
// All other requests have any port stripped and path cleaned
// before passing to mux.handler.
host := stripHostPort(r.Host)
path := cleanPath(r.URL.Path)
// If the given path is /tree and its handler is not registered,
// redirect for /tree/.
if u, ok := mux.redirectToPathSlash(host, path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}
if path != r.URL.Path {
_, pattern = mux.handler(host, path)
u := &url.URL{Path: path, RawQuery: r.URL.RawQuery}
return RedirectHandler(u.String(), StatusMovedPermanently), pattern
}
return mux.handler(host, r.URL.Path)
}
func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
// Host-specific pattern takes precedence over generic ones
if mux.hosts {
h, pattern = mux.match(host + path) // 补充左斜杠
}
if h == nil {
h, pattern = mux.match(path)
}
if h == nil {
h, pattern = NotFoundHandler(), ""
}
return
}
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
// Check for exact match first.
v, ok := mux.m[path] // 精准匹配,命中即返回对应的handler
if ok {
return v.h, v.pattern
}
// Check for longest valid match. mux.es contains all patterns
// that end in / sorted from longest to shortest.
// 模糊匹配,es是按pattern从长到短排序,匹配也是从长到短
for _, e := range mux.es {
if strings.HasPrefix(path, e.pattern) {
return e.h, e.pattern // 如果命中pattern的前缀,就返回对应handler
}
}
return nil, ""
}
路由匹配机制
所以实际net/http包实现的路由匹配机制比较粗糙,
- 精准匹配,命中map中的pattern 即返回对应的handler
- 模糊匹配,在es列表中(按pattern长度从长到短)进行前缀模糊匹配,命中即返回对应的handler
客户端
数据结构
同样有默认的单例实现
type Client struct {
Transport RoundTripper // 通信模块
Jar CookieJar
CheckRedirect func(req *Request, via []*Request) error
Timeout time.Duration
}
Transport
RoundTripper 的实现
type Transport struct {
idleMu sync.Mutex
closeIdle bool // user has requested to close all idle conns
// 放置空闲连接的map,用于复用(连接到同一服务端的)
idleConn map[connectMethodKey][]*persistConn // most recently used at end
idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns
idleLRU connLRU
reqMu sync.Mutex
reqCanceler map[cancelKey]func(error)
altMu sync.Mutex // guards changing altProto only
altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme
connsPerHostMu sync.Mutex
connsPerHost map[connectMethodKey]int
connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns
Proxy func(*Request) (*url.URL, error)
// 新连接生成器
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
Dial func(network, addr string) (net.Conn, error)
DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
DialTLS func(network, addr string) (net.Conn, error)
TLSClientConfig *tls.Config
TLSHandshakeTimeout time.Duration
DisableKeepAlives bool
DisableCompression bool
MaxIdleConns int
MaxIdleConnsPerHost int
MaxConnsPerHost int
IdleConnTimeout time.Duration
ResponseHeaderTimeout time.Duration
ExpectContinueTimeout time.Duration
TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
ProxyConnectHeader Header
GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
MaxResponseHeaderBytes int64
WriteBufferSize int
// ReadBufferSize specifies the size of the read buffer used
// when reading from the transport.
// If zero, a default (currently 4KB) is used.
ReadBufferSize int
// nextProtoOnce guards initialization of TLSNextProto and
// h2transport (via onceSetNextProtoDefaults)
nextProtoOnce sync.Once
h2transport h2Transport // non-nil if http2 wired up
tlsNextProtoWasNil bool // whether TLSNextProto was nil when the Once fired
// ForceAttemptHTTP2 controls whether HTTP/2 is enabled when a non-zero
// Dial, DialTLS, or DialContext func or TLSClientConfig is provided.
// By default, use of any those fields conservatively disables HTTP/2.
// To use a custom dialer or TLS config and still attempt HTTP/2
// upgrades, set this to true.
ForceAttemptHTTP2 bool
}
发起tcp连接的步骤
- 构造http请求参数
- 获取tcp连接
- 通过tcp连接发送数据
- 通过tcp连接接受数据
(t Transport) roundTrip(req *Request)(Response, error)
// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
// ...
for {
// ...
pconn, err := t.getConn(treq, cm) // 获取tcp连接,会从空闲池优先获取
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req) // tcp的连接
} else {
resp, err = pconn.roundTrip(treq) // tcp的连接
}
if err == nil {
resp.Request = origReq
return resp, nil
}
// ...
}
}
构造连接 (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error)
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1), // 读channel
writech: make(chan writeRequest, 1), // 写channel
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
// ...
conn, err := t.dial(ctx, "tcp", cm.addr())
// ...
pconn.conn = conn
// ...
go pconn.readLoop() // 将tcp连接中读取的数据放入读channel
go pconn.writeLoop()// 将需要写入的tcp连接的数据放入写channel
return pconn, nil
}
三个loop goroutine
- pconn.readLoop() :将tcp服务端的响应放入read channel中:生产读channel
- pconn.writeLoop(): 将需要写入的tcp连接的数据放入写channel:消费写channel
- 主goroutine:处理程序的请求和响应动作,处理另外两个goroutine的channel的数据(消费读channel数据, 生产写channel数据)
通过channel,实现三个goroutine的通信, 解耦单个连接通道的阻塞,避免每个请求都要阻塞等待服务端响应,使读写互不干扰
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!