如何追踪golang channel?

2023年就要结束了,算起来距离上一次更新也有很久了。搜肠刮肚,总得在23年结束前再搞两篇总结,算是有始有终。总结今年,总还是绕不过 BPF,golang。既然如此,就对BPF观测golang这个话题再往下挖掘下,先做第一篇文章。下旬如果有时间并且顺利的话,希望能把BPF的原理总结完成。

无侵入观测服务拓扑四元组的一种实现中,笔者有提到追踪golang处理过程的两个无法解决的问题是golang里的channel处理以及goroutine pool。再深究下,这两个问题实际上都可以归纳到对channel的处理,因为很多goroutine pool都离不了channel的使用,比如Jeffail/tunny这个库。
本文将会构建一个channel的追踪的方案。

# 一、追踪效果

按照惯例,我们还是来看下效果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// terminal 1,启动服务
$ ./drink-srv

// terminal 2,启动追踪脚本
$ sudo bpftrace ./drink.bt
Attaching 7 probes...  // 启动后停止在这里
serve HTTP: /alcohol   // 触发接口后输出
caller: /alcohol, callee: :/unknown/prepare/hotel
serve HTTP: /tea
caller: /tea, callee: :/unknown/prepare/club

// terminal 3,触发服务接口
$ curl "localhost:1423/alcohol?age=22"
$ curl "localhost:1423/tea?age=12"

# 二、方案设计

关于golang channel的实现及设计,可以参见图解Go的channel底层实现,里面有非常生动的动图实现;搭配源码食用更好runtime/chan.go
笔者在这里再简单的总结下,对sendrecv两种操作设计的状态做一个简单的概述: chan-send的状态: overwrote existing file chan-recv的状态: overwrote existing file

比如,对于下面的代码,派生出的g1在开启select后,由于ticketChan是空的,会触发g1让出m里的执行权限,进入gopark状态。同时,ticketChan会将g1封装成sudog,放到recvq队列中。当一段时间之后,其他的g将数据写入channel里时,会在chansend时,检查到recvq不为空,会直接将数据拷贝到空闲的sudog中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var ticketChan = make(chan TicketInfo, 10)
func HandleDrink() {
    for {
        select {
        case info, ok := <-ticketChan:
			...
            }
        }
    }
}
func main() {
    go HandleDrink()
    ...
}

chanrecv进入recvq对应的golang处理逻辑在这里:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// runtime/chan.go
	...
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
    ...
    

chansend直接将数据拷贝到recvq对应的golang处理逻辑在这里:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// runtime/chan.go
	...
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    ...
    

# 三、方案实现

了解了channel的处理流程,追踪的方案就比较明确了,直接在关键的函数处设置hook点即可。先来看下目标服务:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main

import (
	"log"
	"net/http"
	"strconv"

	"github.com/gin-gonic/gin"
)

const (
	ALCOHOL = iota + 1001
	COCO
	COFFEE
	TEA
)

type TicketInfo struct {
	Age  int
	Name string
	Type int
}

var ticketChan = make(chan TicketInfo, 10)

func AlcoholH(c *gin.Context) {
	var ticket = TicketInfo{}
	var err error
	ticket.Age, err = strconv.Atoi(c.Query("age"))
	if err != nil {
		c.String(http.StatusOK, "handle failed")
		return
	}
	ticket.Name = c.Query("name")
	ticket.Type = ALCOHOL
	ticketChan <- ticket
	return
}

func TeaH(c *gin.Context) {
	var ticket = TicketInfo{}
	var err error
	ticket.Age, err = strconv.Atoi(c.Query("age"))
	if err != nil {
		log.Println("handle failed, ", err.Error())
		c.String(http.StatusOK, "handle failed")
		return
	}
	ticket.Name = c.Query("name")
	ticket.Type = TEA
	ticketChan <- ticket
	c.String(http.StatusOK, "okay")
	return
}

func HandleDrink() {
	for {
		select {
		case info, ok := <-ticketChan:
			if !ok {
				log.Println("chan closed.")
				return
			}
			log.Println("get ticket")
			switch info.Type {
			case ALCOHOL:
				Alcohol(info)
			case COCO:
				SoftDrink(info)
			case COFFEE, TEA:
				Tea(info)
			default:
				log.Println("unknown drink type")
			}
		}
	}
}

func Alcohol(ticket TicketInfo) {
	var url = "http://localhost/unknown/prepare/hotel"
	http.DefaultClient.Get(url)
	return
}

func SoftDrink(ticket TicketInfo) {
	log.Printf("[%s, %d] drink %d", ticket.Name, ticket.Age, ticket.Type)
	return
}

func Tea(ticket TicketInfo) {
	var url = "http://localhost/unknown/prepare/club"
	http.DefaultClient.Get(url)
	return
}

func main() {
	defer func() {
		close(ticketChan)
	}()

	go HandleDrink()

	var r = gin.Default()
	r.GET("/alcohol", AlcoholH)
	r.GET("/tea", TeaH)

	var srv = &http.Server{
		Addr:    "127.0.0.1:1423",
		Handler: r,
	}
	if srv.ListenAndServe() != nil {
		log.Println("failed to handle service listen")
		return
	}
}

对于这样的一个服务,希望达到示例中的追踪效果,对应的方案为:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#define OFF_TASK_THRD 4992
#define OFF_THRD_FSBASE 40
#define GOID_OFFSET 152

uprobe:./drink-srv:"runtime.runqput"
{
  $prob_mark = "runqput";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  if (@new_go[tid, pid] == 0){
    return;
  }
  $p_goid = @new_go[tid, pid];

  $g = (uint64)(reg("bx"));
  $goid = *(uint64*)($g+GOID_OFFSET);
  @caller_addr[$goid] = @caller_addr[$p_goid];
  @caller_len[$goid]  = @caller_len[$p_goid];
}

uprobe:./drink-srv:"runtime.newproc1"
{
  $prob_mark = "newproc1";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $g = (uint64)(reg("bx"));
  $goid = *(uint64*)($g+GOID_OFFSET);
  if (@caller_addr[$goid] == 0){
    return;
  }
  @new_go[tid, pid] = $goid;
}

// 这里,将 caller 信息和写入 channel 信息的 key 关联起来
uprobe:./drink-srv:"runtime.chansend"
{
  $prob_mark = "chansend";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  // 如果当前执行goroutine中没有caller,跳过
  if(@caller_addr[$goid] == 0){
    return;
  }

  $chan = (uint64)reg("ax");
  $qcount = *(uint32*)($chan + 0);
  $buf = *(uint64*)($chan+16);
  @send_addr[$chan, $qcount] = @caller_addr[$goid];
  @send_len[$chan, $qcount]  = @caller_len[$goid];

  return;
}


uprobe:./drink-srv:"runtime.chanrecv"
{
  $prob_mark = "chanrecv";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  $chan = (uint64)reg("ax");
  $qcount = *(uint32*)($chan + 0);
  $buf = *(uint64*)($chan+16);
  if (@send_addr[$chan, $qcount] == 0){
    return;
  }
  @caller_addr[$goid] = @send_addr[$chan, $qcount];
  @caller_len[$goid]  = @send_len[$chan, $qcount];

  return;
}

uprobe:./drink-srv:"runtime.send"
{
  $prob_mark = "send";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $chan = (uint64)reg("ax");
  $sg = (uint64)reg("bx");
  $g = *(uint64*)($sg+0);
  $goid = *(uint64*)($g+GOID_OFFSET);

  $qcount = *(uint32*)($chan+0);

  @caller_addr[$goid] = @send_addr[$chan, $qcount];
  @caller_len[$goid]  = @send_len[$chan, $qcount];

  return;
}


/*
type serverHandler struct {
	srv *Server
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
*/
uprobe:./drink-srv:"net/http.serverHandler.ServeHTTP"
{
  $prob_mark = "ServeHTTP";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  $req_addr = reg("di");
  // offset(Request.URL) = 16
  $url_addr = *(uint64*)($req_addr+16);
  // offset(URL.Path) = 56
  $path_addr = *(uint64*)($url_addr+56);
  $path_len  = *(uint64*)($url_addr+64);

  @caller_addr[$goid] = $path_addr;
  @caller_len[$goid]  = $path_len;

  printf("serve HTTP: %s
", str($path_addr, $path_len));

  return;
}

/*
func (c *Client) do(req *Request) (retres *Response, reterr error) {
*/
uprobe:./drink-srv:"net/http.(*Client).do"
{
  $prob_mark = "do";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  if (@caller_addr[$goid] == 0){
    printf("%d: has no caller.
", $goid);
    return;
  }

  $req_addr = reg("bx");
  // offset(Request.URL) = 16
  $url_addr = *(uint64*)($req_addr+16);
  // offset(URL.Host) = 40
  $host_addr = *(uint64*)($req_addr+40);
  $host_len  = *(uint64*)($req_addr+48);
  // offset(URL.Path) = 56
  $path_addr = *(uint64*)($url_addr+56);
  $path_len  = *(uint64*)($url_addr+64);

  $c_addr = @caller_addr[$goid];
  $c_len  = @caller_len[$goid];

  printf("caller: %s, callee: %s:%s
", str($c_addr, $c_len),
    str($host_addr, $host_len), str($path_addr, $path_len));
}

# 四、追踪的风险

至此,看起来golang channel是可以追踪的。但是实际上并非如此。比如如下这个示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func HandleDrink() {
    for {
        select {
        case info, ok := <-ticketChan:
			...
        default: // 注意这个 stop
        	// no stop here
        }
    }
}

这段逻辑在代码编写、编译阶段均无问题,是一段完全合理的逻辑。当我们试图追踪这段代码时:

1
2
3
4
$ sudo bpftrace ./drink.bt
Attaching 7 probes...
^C  // 直接停止,没有请求
@prob[chanrecv]: 908571

注意,此时并没有做任何的操作,但是这个chanrecv这个hook点已经触发了数十万次。而我们知道,BPF hook点的触发并非没有开销的。因此,目标的代码在完全合理的情况下,我们的追踪程序会给系统带来很大的负载。这显然是我们需要避免的。

以上,周末愉快~

Hello, World!
使用 Hugo 构建
主题 StackJimmy 设计