Featured image of post Analyzing High-Performance Network Framework nbio in Go

Analyzing High-Performance Network Framework nbio in Go

A Deep Dive into Efficient Network Programming

 

Introduction

In this article, we’ll continue our in-depth analysis of another high-performance network programming framework: nbio.
The nbio project also includes nbhttp built on top of nbio, but that’s outside the scope of our discussion.
Like evio, nbio adopts the classic Reactor pattern. In fact, many asynchronous network frameworks in Go are designed based on this pattern.
Let’s start by running the nbio program code.

Server:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
 
 import (
     "fmt"
     "github.com/lesismal/nbio"
 )
 
 func main() {
     g := nbio.NewGopher(nbio.Config{
         Network:            "tcp",
         Addrs:              []string{":8888"},
         MaxWriteBufferSize: 6 * 1024 * 1024,
     })
 
     g.OnData(func(c *nbio.Conn, data []byte) {
         c.Write(append([]byte{}, data...))
     })
 
     err := g.Start()
     if err != nil {
         fmt.Printf("nbio.Start failed: %v\n", err)
         return
     }
 
     defer g.Stop()
     g.Wait()
 }

Here, we create a new Engine instance using the nbio.NewGopher() function. We pass a nbio.Config struct to configure the Engine instance, including:

  • Network: The type of network to use, which is “TCP” in this case.
  • Addrs: The addresses and ports the server should listen to, here it’s “:8888” (listening on port 8888 of the local machine).
  • MaxWriteBufferSize: The maximum size of the write buffer, is set to 6MB here.

Other configurations can be explored further. Then, we register a data reception callback function using g.OnData() the Engine instance. This callback function is invoked when data is received. It takes two parameters: the connection object c and the received data data. Inside the callback function, we use c.Write() a method to write the received data back to the client.

Client:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main
 
 import (
     "bytes"
     "context"
     "fmt"
     "math/rand"
     "time"
     "github.com/lesismal/nbio"
     "github.com/lesismal/nbio/logging"
 )
 
 func main() {
     var (
         ret  []byte
         buf  = make([]byte, 1024*1024*4)
         addr = "localhost:8888"
         ctx, _ = context.WithTimeout(context.Background(), 60*time.Second)
     )
 
     logging.SetLevel(logging.LevelInfo)
     rand.Read(buf)
 
     g := nbio.NewGopher(nbio.Config{})
     done := make(chan int)
 
     g.OnData(func(c *nbio.Conn, data []byte) {
         ret = append(ret, data...)
         if len(ret) == len(buf) {
             if bytes.Equal(buf, ret) {
                 close(done)
             }
         }
     })
 
     err := g.Start()
     if err != nil {
         fmt.Printf("Start failed: %v\n", err)
     }
 
     defer g.Stop()
 
     c, err := nbio.Dial("tcp", addr)
     if err != nil {
         fmt.Printf("Dial failed: %v\n", err)
     }
 
     g.AddConn(c)
     c.Write(buf)
 
     select {
     case <-ctx.Done():
         logging.Error("timeout")
     case <-done:
         logging.Info("success")
     }
 }

At first glance, it might seem a bit cumbersome. Actually, the server and client share the same set of structures.

The client connects to the server through nbio.Dial, and upon successful connection, it encapsulates into nbio.Conn. Here, nbio.Conn implements the net.Conn interface of the standard library. Finally, it adds this connection via g.AddConn(c) and writes data to the server. When the server receives the data, its handling logic is to send the data back to the client as is. When the client receives the data, the OnData callback is triggered. This callback checks if the received data length matches the sent data length, and if so, it closes the connection.

Now, let’s delve into a few key structures.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type Engine struct {
     //...
     sync.WaitGroup
     //...
     mux                        sync.Mutex
     wgConn                     sync.WaitGroup
     network                    string
     addrs                      []string
     //...
     connsStd                   map[*Conn]struct{}
     connsUnix                  []*Conn
     listeners                  []*poller
     pollers                    []*poller
     onOpen                     func(c *Conn)
     onClose                    func(c *Conn, err error)
     onRead                     func(c *Conn)
     onData                     func(c *Conn, data []byte)
     onReadBufferAlloc          func(c *Conn) []byte
     onReadBufferFree           func(c *Conn, buffer []byte)
     //...
 }

The Engine is essentially the core manager, responsible for managing all listeners, pollers, and worker pollers.

What’s the difference between these two types of pollers?

The difference lies in their responsibilities.

The listener poller is responsible only for accepting new connections. When a new client conn arrives, it selects a worker poller from pollers and adds conn to the corresponding worker poller. Subsequently, the worker poller is responsible for handling the read/write events of this conn.

Therefore, when we start the program, if only one address is being listened to, the number of polls in the program equals 1 (listener poller) + pollerNum.

From the fields above, you can customize some configurations and callbacks. For example, you can set a callback function onOpen when a new connection arrives, or set a callback function onData when data arrives, etc.

1
2
3
4
5
6
7
8
9
type Conn struct {
     mux                   sync.Mutex
     p                     *poller
     fd                    int
     //...
     writeBuffer           []byte
     //...
     DataHandler           func(c *Conn, data []byte)
 }

The Conn structure represents a network connection. Each conn belongs to only one poller. writeBuffer: When data is not completely written at once, the remaining data is first stored in writeBuffer and waits for the next writable event to continue writing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type poller struct {
     g             *Engine
     epfd          int
     evtfd         int
     index         int
     shutdown      bool
     listener      net.Listener
     isListener    bool
     unixSockAddr  string
     ReadBuffer    []byte
     pollType      string
 }

As for the poller structure, it’s an abstract concept used to manage underlying multiplexed I/O operations (such as epoll on Linux, kqueue on Darwin, etc.).

Pay attention to pollType, nbio defaults to epoll using level-triggered (LT) mode, but users can also set it to edge-triggered (ET) mode.

After introducing the basic structures, let’s move on to the code flow.

When you start the server code provided above, when you call Start:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (g *Engine) Start() error {
     //...
     switch g.network {
     // First part: initialize listener
     case "unix", "tcp", "tcp4", "tcp6":
         for i := range g.addrs {
             ln, err := newPoller(g, true, i)
             if err != nil {
                 for j := 0; j < i; j++ {
                     g.listeners[j].stop()
                 }
                 return err
             }
             g.addrs[i] = ln.listener.Addr().String()
             g.listeners = append(g.listeners, ln)
         }
     //...
     // Second part: initialize a certain number of pollers
     for i := 0; i < g.pollerNum; i++ {
         p, err := newPoller(g, false, i)
         if err != nil {
             for j := 0; j < len(g.listeners); j++ {
                 g.listeners[j].stop()
             }
             for j := 0; j < i; j++ {
                 g.pollers[j].stop()
             }
             return err
         }
         g.pollers[i] = p
     }
     //...
     // Third part: start all worker pollers
     for i := 0; i < g.pollerNum; i++ {
         g.pollers[i].ReadBuffer = make([]byte, g.readBufferSize)
         g.Add(1)
         go g.pollers[i].start()
     }
     // Fourth part: start all listeners
     for _, l := range g.listeners {
         g.Add(1)
         go l.start()
     }
     //... (ignore UDP)
     //...
 }

The code is understandable. It’s divided into four parts:

First part: initialize listener

Based on the g.network value (e.g., “unix”, “tcp”, “tcp4”, “tcp6”), create a new poller for each address to listen on. This poller mainly manages events on the listening socket. If an error occurs during creation, stop all previously created listeners and return an error.

Second part: initialize a certain number of pollers

Create the specified number of worker pollers ( pollerNum). These pollers handle read/write events on connected sockets. If an error occurs during creation, stop all listeners and previously created worker pollers, and then return an error.

Third part: start all worker pollers

Assign a read buffer for each worker poller concurrently and start these pollers.

Fourth part: start all listeners

Start all previously created listeners and begin listening for connection requests on respective addresses.

Regarding the startup of pollers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (p *poller) start() {
     defer p.g.Done()
     //...
     if p.isListener {
         p.acceptorLoop()
     } else {
         defer func() {
             syscall.Close(p.epfd)
             syscall.Close(p.evtfd)
         }()
         p.readWriteLoop()
     }
 }

It’s divided into two cases. If it’s a listener poller:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (p *poller) acceptorLoop() {
     // If you want the current goroutine not to be scheduled to other operation threads.
     if p.g.lockListener {
         runtime.LockOSThread()
         defer runtime.UnlockOSThread()
     }
     p.shutdown = false
     for !p.shutdown {
         conn, err := p.listener.Accept()
         if err == nil {
             var c *Conn
             c, err = NBConn(conn)
             if err != nil {
                 conn.Close()
                 continue
             }
             // p.g.pollers[c.Hash()%len(p.g.pollers)].addConn(c)
         } else {
             var ne net.Error
             if ok := errors.As(err, &ne); ok && ne.Timeout() {
                 logging.Error("NBIO[%v][%v_%v] Accept failed: temporary error, retrying...", p.g.Name, p.pollType, p.index)
                 time.Sleep(time.Second / 20)
             } else {
                 if !p.shutdown {
                     logging.Error("NBIO[%v][%v_%v] Accept failed: %v, exit...", p.g.Name, p.pollType, p.index, err)
                 }
                 break
             }
         }
     }
 }

The listener poller waits for new connections and, upon arrival, encapsulates them into nbio.Conn after acceptance. Then, it adds the conn to the corresponding worker poller.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (p *poller) addConn(c *Conn) {
     c.p = p
     if c.typ != ConnTypeUDPServer {
         p.g.onOpen(c)
     }
     fd := c.fd
     p.g.connsUnix[fd] = c
     err := p.addRead(fd)
     if err != nil {
         p.g.connsUnix[fd] = nil
         c.closeWithError(err)
         logging.Error("[%v] add read event failed: %v", c.fd, err)
     }
 }

An interesting design here is the management of conns. The structure is a slice, and the author directly uses conn’s fd as the index. This has its benefits:

  • With a large number of connections, the burden during garbage collection is smaller compared to using a map.
  • It prevents serial number issues.

Finally, the corresponding conn fd is added to epoll by calling addRead.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (p *poller) addRead(fd int) error {
     switch p.g.epollMod {
     case EPOLLET:
         return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLET})
     default:
         return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.E
 
 pollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN})
     }
 }

It’s reasonable not to register the write event here because there’s no data to send on a new connection. This approach avoids some unnecessary system calls, thereby enhancing program performance.

If it’s a worker poller’s startup, its job is to wait for events from the added conns and handle them accordingly.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (p *poller) readWriteLoop() {
     //...
     msec := -1
     events := make([]syscall.EpollEvent, 1024)
     //...
     for !p.shutdown {
         n, err := syscall.EpollWait(p.epfd, events, msec)
         if err != nil && !errors.Is(err, syscall.EINTR) {
             return
         }
         if n <= 0 {
             msec = -1
             continue
         }
         msec = 20
         // Traverse events
         for _, ev := range events[:n] {
             fd := int(ev.Fd)
             switch fd {
             case p.evtfd:
             default:
                 c := p.getConn(fd)
                 if c != nil {
                     if ev.Events&epollEventsError != 0 {
                         c.closeWithError(io.EOF)
                         continue
                     }
                     // If it's writable, flush the data
                     if ev.Events&epollEventsWrite != 0 {
                         c.flush()
                     }
                     // Read event
                     if ev.Events&epollEventsRead != 0 {
                         if p.g.onRead == nil {
                             for i := 0; i < p.g.maxConnReadTimesPerEventLoop; i++ {
                                 buffer := p.g.borrow(c)
                                 rc, n, err := c.ReadAndGetConn(buffer)
                                 if n > 0 {
                                     p.g.onData(rc, buffer[:n])
                                 }
                                 p.g.payback(c, buffer)
                                 //...
                                 if n < len(buffer) {
                                     break
                                 }
                             }
                         } else {
                             p.g.onRead(c)
                         }
                     }
                 } else {
                     syscall.Close(fd)
                 }
             }
         }
     }
 }

This piece of code is also straightforward. It waits for events to arrive, traverses the event list, and handles each event accordingly.

1
func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error)

In EpollWait, only msec is user-modifiable. Usually, we set msec = -1 to make the function block until at least one event occurs; otherwise, it blocks indefinitely. This method is very useful when there are few events because it minimizes CPU usage.

If you want to respond to events as quickly as possible, you can set msec = 0. This makes EpollWait return immediately without waiting for any events. In this case, your program may call EpollWait more frequently, but it can process events immediately after they occur, leading to higher CPU usage.

If your program can tolerate some delay and you want to reduce CPU usage, you can set msec it to a positive number. This makes EpollWait waiting for events for the specified time. If no events occur during this time, the function returns, and you can choose to call EpollWait again later. This method can reduce CPU usage but may result in longer response times.

Nbio adjusts the value msec according to event counts. If the count is greater than 0, msec is set to 20.

The code for ByteDance’s netpoll is similar; if the event count is greater than 0, msec it is set to 0. If the event count is less than or equal to 0, msec it is set to -1, and then Gosched() is called to voluntarily yield the current Goroutine.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
var msec = -1
 for {
     n, err = syscall.EpollWait(epfd, events, msec)
     if n <= 0 {
         msec = -1
         runtime.Gosched()
         continue
     }
     msec = 0
     ...
 }

However, the code for voluntary switching in nbio has been commented out. According to the author’s explanation in the issue, initially, he referred to ByteDance’s method and added voluntary switching.

However, during performance testing of nbio, it was found that adding or not adding voluntary switching did not significantly affect performance. Therefore, it was ultimately decided to remove it.

The processing part of the event.

If it is a readable event, you can obtain the corresponding buffer through built-in or custom memory allocators, and then call ReadAndGetConn to read the data without needing to allocate a buffer every time.

If it is a writable event, flush will be called to send out the unsent data in the buffer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (c *Conn) flush() error {
   //.....
   old := c.writeBuffer
   n, err := c.doWrite(old)
   if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
     //.....
   }
 
   if n < 0 {
     n = 0
   }
   left := len(old) - n
   // The description is not finished, so store the rest in writeBuffer for next writing.
   if left > 0 {
     if n > 0 {
       c.writeBuffer = mempool.Malloc(left)
       copy(c.writeBuffer, old[n:])
       mempool.Free(old)
     }
     // c.modWrite()
   } else {
     mempool.Free(old)
     c.writeBuffer = nil
     if c.wTimer != nil {
       c.wTimer.Stop()
       c.wTimer = nil
     }
     // The explanation is finished, reset the conn to only have read events first.
     c.resetRead()
     //...
   }
 
   c.mux.Unlock()
   return nil
 }

The logic is also very simple, write as much as there is, if it cannot be written, put the remaining data back into the writeBuffer and write again when epollWait triggers.

If writing is completed, then there is no more data to be written, reset the event of this connection to a read event.

That’s basically how the main logic works.

Wait a minute, when we initially mentioned that a new connection comes in, we only registered a read event for the connection and didn’t register a write event. When was the write event registered?

Of course, it is registered when you call conn.Write.

1
2
3
4
5
6
7
8
9
g := nbio.NewGopher(nbio.Config{
     Network:            "tcp",
     Addrs:              []string{":8888"},
     MaxWriteBufferSize: 6 * 1024 * 1024,
   })
 
   g.OnData(func(c *nbio.Conn, data []byte) {
     c.Write(append([]byte{}, data...))
   })

When conn data arrives, the bottom layer will call back the OnData function after reading the data. At this time, you can call Write to send data to the other end.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
g := nbio.NewGopher(nbio.Config{
     Network:            "tcp",
     Addrs:              []string{":8888"},
     MaxWriteBufferSize: 6 * 1024 * 1024,
   })
 
   g.OnData(func(c *nbio.Conn, data []byte) {
     c.Write(append([]byte{}, data...))
   })
 // When data arrives on conn, the underlying layer will read the data and callback the OnData function. At this time, you can call Write to send data to the other end.
 
 func (c *Conn) Write(b []byte) (int, error) {
   //....
   n, err := c.write(b)
   if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
     //.....
     return n, err
   }
 
   if len(c.writeBuffer) == 0 {
     if c.wTimer != nil {
       c.wTimer.Stop()
       c.wTimer = nil
     }
   } else {
 // There is still data that has not been written, add a write event.
     c.modWrite()
   }
   //.....
   return n, err
 }
 
 func (c *Conn) write(b []byte) (int, error) {
   //...
   if len(c.writeBuffer) == 0 {
     n, err := c.doWrite(b)
     if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
       return n, err
     }
     //.....

     left := len(b) - n
 // Not finished yet, put the remaining into writeBuffer.
     if left > 0 && c.typ == ConnTypeTCP {
       c.writeBuffer = mempool.Malloc(left)
       copy(c.writeBuffer, b[n:])
       c.modWrite()
     }
     return len(b), nil
   }
 // If there is still unwritten data in the writeBuffer, the new data will also be appended.
   c.writeBuffer = mempool.Append(c.writeBuffer, b...)
 
   return len(b), nil
 }

When the data is not completely written, the remaining data is put into writeBuffer, which will trigger the execution of modWrite and register the write event of conn to epoll.

Summary

Compared to Evio, nbio does not have a thundering herd effect.
Evio achieves logical correctness by constantly waking up epoll invalidly. Nbio tries to minimize system calls and reduce unnecessary overhead.

In terms of usability, nbio implements a standard library net.Conn and many settings are configurable, allowing users to customize with high flexibility.
Pre-allocated buffers are used for reading and writing to improve application performance.
In conclusion, nbio is a good high-performance non-blocking network framework.

Licensed under CC BY-NC-SA 4.0
Built with Hugo
Theme Stack designed by Jimmy