本文最后更新于:2023年8月8日 晚上

服务端

数据结构

type Handler interface {
   ServeHTTP(ResponseWriter, *Request)
}

type Server struct {
   Addr string
   
   // 路由处理器
   Handler Handler // handler to invoke, http.DefaultServeMux if nil

   TLSConfig *tls.Config
   ReadTimeout time.Duration
   ReadHeaderTimeout time.Duration
   WriteTimeout time.Duration
   IdleTimeout time.Duration
   MaxHeaderBytes int
   TLSNextProto map[string]func(*Server, *tls.Conn, Handler)
   ConnState func(net.Conn, ConnState)
   ErrorLog *log.Logger
   BaseContext func(net.Listener) context.Context
   ConnContext func(ctx context.Context, c net.Conn) context.Context

   inShutdown atomicBool // true when server is in shutdown

   disableKeepAlives int32     // accessed atomically.
   nextProtoOnce     sync.Once // guards setupHTTP2_* init
   nextProtoErr      error     // result of http2.ConfigureServer if used

   mu         sync.Mutex
   listeners  map[*net.Listener]struct{}
   activeConn map[*conn]struct{}
   doneChan   chan struct{}
   onShutdown []func()

   listenerGroup sync.WaitGroup
}

// Handler 路由处理器的实现
type ServeMux struct {
   mu    sync.RWMutex
   m     map[string]muxEntry
   es    []muxEntry // slice of entries sorted from longest to shortest.
   hosts bool       // whether any patterns contain hostnames
}

type muxEntry struct {
   h       Handler 
   pattern string
}

ServeMux

  • m map[string]muxEntry:维护path和handler的映射关系
  • es []muxEntry 用于路由前缀匹配
  • muxEntry: 在handler外,冗余保存了path路径信息

handler和路由注册过程

Demo

http.HandleFunc("/ping", func(writer http.ResponseWriter, request *http.Request) {
   _, _ = writer.Write([]byte("pong"))
})

HandleFunc

补充内部单例实现的ServeMux

// DefaultServeMux is the default ServeMux used by Serve.
var DefaultServeMux = &defaultServeMux // 单例

var defaultServeMux ServeMux

func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
   DefaultServeMux.HandleFunc(pattern, handler)
}

ServeMux.HandleFunc

将demo中传入的handler方法,包装成HandlerFunc 该类型实现了上述的 ServeHTTP(w ResponseWriter, r *Request) 接口,实现逻辑是函数自身的调用

// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
   if handler == nil {
      panic("http: nil handler")
   }
   mux.Handle(pattern, HandlerFunc(handler))
}

type HandlerFunc func(ResponseWriter, *Request) // 实现Handler 请求处理接口

// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
   f(w, r) // 调用handler自身
}

ServeMux.Handle

路由及方法的注册
如果注册的路径pattern是以 / 结尾,则放入es中的合适位置,用于路径前缀匹配

func (mux *ServeMux) Handle(pattern string, handler Handler) {
   mux.mu.Lock()
   defer mux.mu.Unlock()

   if pattern == "" {
      panic("http: invalid pattern")
   }
   if handler == nil {
      panic("http: nil handler")
   }
   if _, exist := mux.m[pattern]; exist {
      panic("http: multiple registrations for " + pattern)
   }

   if mux.m == nil {
      mux.m = make(map[string]muxEntry)
   }
   e := muxEntry{h: handler, pattern: pattern}
   mux.m[pattern] = e
   if pattern[len(pattern)-1] == '/' {
      mux.es = appendSorted(mux.es, e)
   }

   if pattern[0] != '/' {
      mux.hosts = true
   }
}

服务启动过程

ListenAndServe

func ListenAndServe(addr string, handler Handler) error {
   server := &Server{Addr: addr, Handler: handler}
   return server.ListenAndServe()
}

(srv *Server) ListenAndServe()

调用net包,监听端口

func (srv *Server) ListenAndServe() error {
   if srv.shuttingDown() {
      return ErrServerClosed
   }
   addr := srv.Addr
   if addr == "" {
      addr = ":http"
   }
   ln, err := net.Listen("tcp", addr)
   if err != nil {
      return err
   }
   return srv.Serve(ln)
}

(srv *Server) Serve

func (srv *Server) Serve(l net.Listener) error {
   // ...

   ctx := context.WithValue(baseCtx, ServerContextKey, srv)
   for {
      rw, err := l.Accept() // 调用tcp 的accept方法
   
    // ... 

      c := srv.newConn(rw)
    // ... 
      go c.serve(connCtx)
   }
}
  • l.Accept():在for循环中,会使用net包的tcp accept方法,原理是epoll多路复用技术Linux ,不会持续的自旋占用cpu时间片,而是有请求到来时,才由内核精准唤醒
  • newConn : 封装http请求
  • go c.serve(connCtx): 使用goroutine处理请求

(c *conn) serve

func (c *conn) serve(ctx context.Context) {
// ...
    serverHandler{c.server}.ServeHTTP(w, w.req) // 调用ServeHTTP方法
//...
}
sh serverHandler) ServeHTTP
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
   handler := sh.srv.Handler
   if handler == nil {
      handler = DefaultServeMux // 不传会补充单例的DefaultServeMux
   }
   if req.RequestURI == "*" && req.Method == "OPTIONS" {
      handler = globalOptionsHandler{}
   }

   if req.URL != nil && strings.Contains(req.URL.RawQuery, ";") {
      var allowQuerySemicolonsInUse int32
      req = req.WithContext(context.WithValue(req.Context(), silenceSemWarnContextKey, func() {
         atomic.StoreInt32(&allowQuerySemicolonsInUse, 1)
      }))
      defer func() {
         if atomic.LoadInt32(&allowQuerySemicolonsInUse) == 0 {
            sh.srv.logf("http: URL query contains semicolon, which is no longer a supported separator; parts of the query may be stripped when parsed; see golang.org/issue/25192")
         }
      }()
   }

   handler.ServeHTTP(rw, req) // handler是ServeMux,所以会走到ServeMux的实现
}

(mux *ServeMux) ServeHTTP

  • 找到路径对应的hanlder
  • 调用该handler
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
   if r.RequestURI == "*" {
      if r.ProtoAtLeast(1, 1) {
         w.Header().Set("Connection", "close")
      }
      w.WriteHeader(StatusBadRequest)
      return
   }
   h, _ := mux.Handler(r) // 这里👇
   h.ServeHTTP(w, r)
}

(mux *ServeMux) Handler(

func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {

   // CONNECT requests are not canonicalized.
   if r.Method == "CONNECT" {
      // If r.URL.Path is /tree and its handler is not registered,
      // the /tree -> /tree/ redirect applies to CONNECT requests
      // but the path canonicalization does not.
      if u, ok := mux.redirectToPathSlash(r.URL.Host, r.URL.Path, r.URL); ok {
         return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
      }

      return mux.handler(r.Host, r.URL.Path)
   }

   // All other requests have any port stripped and path cleaned
   // before passing to mux.handler.
   host := stripHostPort(r.Host)
   path := cleanPath(r.URL.Path)

   // If the given path is /tree and its handler is not registered,
   // redirect for /tree/.
   if u, ok := mux.redirectToPathSlash(host, path, r.URL); ok {
      return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
   }

   if path != r.URL.Path {
      _, pattern = mux.handler(host, path)
      u := &url.URL{Path: path, RawQuery: r.URL.RawQuery}
      return RedirectHandler(u.String(), StatusMovedPermanently), pattern
   }

   return mux.handler(host, r.URL.Path)
}
func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
   mux.mu.RLock()
   defer mux.mu.RUnlock()

   // Host-specific pattern takes precedence over generic ones
   if mux.hosts {
      h, pattern = mux.match(host + path) // 补充左斜杠
   }
   if h == nil {
      h, pattern = mux.match(path)
   }
   if h == nil {
      h, pattern = NotFoundHandler(), ""
   }
   return
}
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
   // Check for exact match first.
   v, ok := mux.m[path] // 精准匹配,命中即返回对应的handler
   if ok {
      return v.h, v.pattern
   }

   // Check for longest valid match.  mux.es contains all patterns
   // that end in / sorted from longest to shortest.
   // 模糊匹配,es是按pattern从长到短排序,匹配也是从长到短
   for _, e := range mux.es {
      if strings.HasPrefix(path, e.pattern) {
         return e.h, e.pattern // 如果命中pattern的前缀,就返回对应handler
      }
   }
   return nil, ""
}

路由匹配机制
所以实际net/http包实现的路由匹配机制比较粗糙,

  1. 精准匹配,命中map中的pattern 即返回对应的handler
  2. 模糊匹配,在es列表中(按pattern长度从长到短)进行前缀模糊匹配,命中即返回对应的handler

客户端

数据结构

同样有默认的单例实现

type Client struct {
   Transport RoundTripper // 通信模块

   Jar CookieJar
   
   CheckRedirect func(req *Request, via []*Request) error
   
   Timeout time.Duration
}

Transport

RoundTripper 的实现

type Transport struct {
   idleMu       sync.Mutex
   closeIdle    bool                                // user has requested to close all idle conns
   // 放置空闲连接的map,用于复用(连接到同一服务端的)
   idleConn     map[connectMethodKey][]*persistConn // most recently used at end
   idleConnWait map[connectMethodKey]wantConnQueue  // waiting getConns
   idleLRU      connLRU

   reqMu       sync.Mutex
   reqCanceler map[cancelKey]func(error)

   altMu    sync.Mutex   // guards changing altProto only
   altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme

   connsPerHostMu   sync.Mutex
   connsPerHost     map[connectMethodKey]int
   connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns

   Proxy func(*Request) (*url.URL, error)
    // 新连接生成器
   DialContext func(ctx context.Context, network, addr string) (net.Conn, error)

   Dial func(network, addr string) (net.Conn, error)

   DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)

   DialTLS func(network, addr string) (net.Conn, error)

   TLSClientConfig *tls.Config

   TLSHandshakeTimeout time.Duration

   DisableKeepAlives bool

   DisableCompression bool

   MaxIdleConns int
   MaxIdleConnsPerHost int
   MaxConnsPerHost int
   IdleConnTimeout time.Duration
   ResponseHeaderTimeout time.Duration
   ExpectContinueTimeout time.Duration
   TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
   ProxyConnectHeader Header
   GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
   MaxResponseHeaderBytes int64

   WriteBufferSize int

   // ReadBufferSize specifies the size of the read buffer used
   // when reading from the transport.
   // If zero, a default (currently 4KB) is used.
   ReadBufferSize int

   // nextProtoOnce guards initialization of TLSNextProto and
   // h2transport (via onceSetNextProtoDefaults)
   nextProtoOnce      sync.Once
   h2transport        h2Transport // non-nil if http2 wired up
   tlsNextProtoWasNil bool        // whether TLSNextProto was nil when the Once fired

   // ForceAttemptHTTP2 controls whether HTTP/2 is enabled when a non-zero
   // Dial, DialTLS, or DialContext func or TLSClientConfig is provided.
   // By default, use of any those fields conservatively disables HTTP/2.
   // To use a custom dialer or TLS config and still attempt HTTP/2
   // upgrades, set this to true.
   ForceAttemptHTTP2 bool
}

发起tcp连接的步骤

  • 构造http请求参数
  • 获取tcp连接
  • 通过tcp连接发送数据
  • 通过tcp连接接受数据

(t Transport) roundTrip(req *Request)(Response, error)

// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
    // ...

   for {
     // ... 
      pconn, err := t.getConn(treq, cm) // 获取tcp连接,会从空闲池优先获取
      if err != nil {
         t.setReqCanceler(cancelKey, nil)
         req.closeBody()
         return nil, err
      }

      var resp *Response
      if pconn.alt != nil {
         // HTTP/2 path.
         t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
         resp, err = pconn.alt.RoundTrip(req)  // tcp的连接
      } else {
         resp, err = pconn.roundTrip(treq) // tcp的连接
      }
      if err == nil {
         resp.Request = origReq
         return resp, nil
      }

        // ... 
   }
}

构造连接 (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error)

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
   pconn = &persistConn{
      t:             t,
      cacheKey:      cm.key(),
      reqch:         make(chan requestAndChan, 1), // 读channel
      writech:       make(chan writeRequest, 1), // 写channel
      closech:       make(chan struct{}),
      writeErrCh:    make(chan error, 1),
      writeLoopDone: make(chan struct{}),
   }
   // ...
   conn, err := t.dial(ctx, "tcp", cm.addr())
    // ...
   pconn.conn = conn

    // ...

   go pconn.readLoop() // 将tcp连接中读取的数据放入读channel
   go pconn.writeLoop()// 将需要写入的tcp连接的数据放入写channel
   return pconn, nil
}

三个loop goroutine

  • pconn.readLoop() :将tcp服务端的响应放入read channel中:生产读channel
  • pconn.writeLoop(): 将需要写入的tcp连接的数据放入写channel:消费写channel
  • 主goroutine:处理程序的请求和响应动作,处理另外两个goroutine的channel的数据(消费读channel数据, 生产写channel数据)
    通过channel,实现三个goroutine的通信, 解耦单个连接通道的阻塞,避免每个请求都要阻塞等待服务端响应,使读写互不干扰

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!

golang对unicode的读写方式 上一篇
drpc的代码生成工具的简单改造 下一篇