2023年就要结束了,算起来距离上一次更新也有很久了。搜肠刮肚,总得在23年结束前再搞两篇总结,算是有始有终。总结今年,总还是绕不过 BPF,golang。既然如此,就对BPF观测golang这个话题再往下挖掘下,先做第一篇文章。下旬如果有时间并且顺利的话,希望能把BPF的原理总结完成。
在无侵入观测服务拓扑四元组的一种实现中,笔者有提到追踪golang
处理过程的两个无法解决的问题是golang
里的channel
处理以及goroutine pool
。再深究下,这两个问题实际上都可以归纳到对channel
的处理,因为很多goroutine pool
都离不了channel
的使用,比如Jeffail/tunny这个库。
本文将会构建一个channel
的追踪的方案。
#
一、追踪效果
按照惯例,我们还是来看下效果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // terminal 1,启动服务
$ ./drink-srv
// terminal 2,启动追踪脚本
$ sudo bpftrace ./drink.bt
Attaching 7 probes... // 启动后停止在这里
serve HTTP: /alcohol // 触发接口后输出
caller: /alcohol, callee: :/unknown/prepare/hotel
serve HTTP: /tea
caller: /tea, callee: :/unknown/prepare/club
// terminal 3,触发服务接口
$ curl "localhost:1423/alcohol?age=22"
$ curl "localhost:1423/tea?age=12"
|
#
二、方案设计
关于golang channel
的实现及设计,可以参见图解Go的channel底层实现,里面有非常生动的动图实现;搭配源码食用更好runtime/chan.go。
笔者在这里再简单的总结下,对send
及recv
两种操作设计的状态做一个简单的概述:
chan-send
的状态:
chan-recv
的状态:
比如,对于下面的代码,派生出的g1
在开启select
后,由于ticketChan
是空的,会触发g1
让出m
里的执行权限,进入gopark
状态。同时,ticketChan
会将g1
封装成sudog
,放到recvq
队列中。当一段时间之后,其他的g
将数据写入channel
里时,会在chansend
时,检查到recvq
不为空,会直接将数据拷贝到空闲的sudog
中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| var ticketChan = make(chan TicketInfo, 10)
func HandleDrink() {
for {
select {
case info, ok := <-ticketChan:
...
}
}
}
}
func main() {
go HandleDrink()
...
}
|
chanrecv
进入recvq
对应的golang
处理逻辑在这里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // runtime/chan.go
...
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
...
|
chansend
直接将数据拷贝到recvq
对应的golang
处理逻辑在这里:
1
2
3
4
5
6
7
8
9
10
| // runtime/chan.go
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
|
#
三、方案实现
了解了channel
的处理流程,追踪的方案就比较明确了,直接在关键的函数处设置hook
点即可。先来看下目标服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
| package main
import (
"log"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
)
const (
ALCOHOL = iota + 1001
COCO
COFFEE
TEA
)
type TicketInfo struct {
Age int
Name string
Type int
}
var ticketChan = make(chan TicketInfo, 10)
func AlcoholH(c *gin.Context) {
var ticket = TicketInfo{}
var err error
ticket.Age, err = strconv.Atoi(c.Query("age"))
if err != nil {
c.String(http.StatusOK, "handle failed")
return
}
ticket.Name = c.Query("name")
ticket.Type = ALCOHOL
ticketChan <- ticket
return
}
func TeaH(c *gin.Context) {
var ticket = TicketInfo{}
var err error
ticket.Age, err = strconv.Atoi(c.Query("age"))
if err != nil {
log.Println("handle failed, ", err.Error())
c.String(http.StatusOK, "handle failed")
return
}
ticket.Name = c.Query("name")
ticket.Type = TEA
ticketChan <- ticket
c.String(http.StatusOK, "okay")
return
}
func HandleDrink() {
for {
select {
case info, ok := <-ticketChan:
if !ok {
log.Println("chan closed.")
return
}
log.Println("get ticket")
switch info.Type {
case ALCOHOL:
Alcohol(info)
case COCO:
SoftDrink(info)
case COFFEE, TEA:
Tea(info)
default:
log.Println("unknown drink type")
}
}
}
}
func Alcohol(ticket TicketInfo) {
var url = "http://localhost/unknown/prepare/hotel"
http.DefaultClient.Get(url)
return
}
func SoftDrink(ticket TicketInfo) {
log.Printf("[%s, %d] drink %d", ticket.Name, ticket.Age, ticket.Type)
return
}
func Tea(ticket TicketInfo) {
var url = "http://localhost/unknown/prepare/club"
http.DefaultClient.Get(url)
return
}
func main() {
defer func() {
close(ticketChan)
}()
go HandleDrink()
var r = gin.Default()
r.GET("/alcohol", AlcoholH)
r.GET("/tea", TeaH)
var srv = &http.Server{
Addr: "127.0.0.1:1423",
Handler: r,
}
if srv.ListenAndServe() != nil {
log.Println("failed to handle service listen")
return
}
}
|
对于这样的一个服务,希望达到示例中的追踪效果,对应的方案为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
| #define OFF_TASK_THRD 4992
#define OFF_THRD_FSBASE 40
#define GOID_OFFSET 152
uprobe:./drink-srv:"runtime.runqput"
{
$prob_mark = "runqput";
@prob[$prob_mark] = @prob[$prob_mark] + 1;
if (@new_go[tid, pid] == 0){
return;
}
$p_goid = @new_go[tid, pid];
$g = (uint64)(reg("bx"));
$goid = *(uint64*)($g+GOID_OFFSET);
@caller_addr[$goid] = @caller_addr[$p_goid];
@caller_len[$goid] = @caller_len[$p_goid];
}
uprobe:./drink-srv:"runtime.newproc1"
{
$prob_mark = "newproc1";
@prob[$prob_mark] = @prob[$prob_mark] + 1;
$g = (uint64)(reg("bx"));
$goid = *(uint64*)($g+GOID_OFFSET);
if (@caller_addr[$goid] == 0){
return;
}
@new_go[tid, pid] = $goid;
}
// 这里,将 caller 信息和写入 channel 信息的 key 关联起来
uprobe:./drink-srv:"runtime.chansend"
{
$prob_mark = "chansend";
@prob[$prob_mark] = @prob[$prob_mark] + 1;
$cur = (uint64)curtask;
$fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
$g = *(uint64*)($fsbase-8);
$goid = *(uint64*)($g+GOID_OFFSET);
// 如果当前执行goroutine中没有caller,跳过
if(@caller_addr[$goid] == 0){
return;
}
$chan = (uint64)reg("ax");
$qcount = *(uint32*)($chan + 0);
$buf = *(uint64*)($chan+16);
@send_addr[$chan, $qcount] = @caller_addr[$goid];
@send_len[$chan, $qcount] = @caller_len[$goid];
return;
}
uprobe:./drink-srv:"runtime.chanrecv"
{
$prob_mark = "chanrecv";
@prob[$prob_mark] = @prob[$prob_mark] + 1;
$cur = (uint64)curtask;
$fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
$g = *(uint64*)($fsbase-8);
$goid = *(uint64*)($g+GOID_OFFSET);
$chan = (uint64)reg("ax");
$qcount = *(uint32*)($chan + 0);
$buf = *(uint64*)($chan+16);
if (@send_addr[$chan, $qcount] == 0){
return;
}
@caller_addr[$goid] = @send_addr[$chan, $qcount];
@caller_len[$goid] = @send_len[$chan, $qcount];
return;
}
uprobe:./drink-srv:"runtime.send"
{
$prob_mark = "send";
@prob[$prob_mark] = @prob[$prob_mark] + 1;
$chan = (uint64)reg("ax");
$sg = (uint64)reg("bx");
$g = *(uint64*)($sg+0);
$goid = *(uint64*)($g+GOID_OFFSET);
$qcount = *(uint32*)($chan+0);
@caller_addr[$goid] = @send_addr[$chan, $qcount];
@caller_len[$goid] = @send_len[$chan, $qcount];
return;
}
/*
type serverHandler struct {
srv *Server
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
*/
uprobe:./drink-srv:"net/http.serverHandler.ServeHTTP"
{
$prob_mark = "ServeHTTP";
@prob[$prob_mark] = @prob[$prob_mark] + 1;
$cur = (uint64)curtask;
$fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
$g = *(uint64*)($fsbase-8);
$goid = *(uint64*)($g+GOID_OFFSET);
$req_addr = reg("di");
// offset(Request.URL) = 16
$url_addr = *(uint64*)($req_addr+16);
// offset(URL.Path) = 56
$path_addr = *(uint64*)($url_addr+56);
$path_len = *(uint64*)($url_addr+64);
@caller_addr[$goid] = $path_addr;
@caller_len[$goid] = $path_len;
printf("serve HTTP: %s
", str($path_addr, $path_len));
return;
}
/*
func (c *Client) do(req *Request) (retres *Response, reterr error) {
*/
uprobe:./drink-srv:"net/http.(*Client).do"
{
$prob_mark = "do";
@prob[$prob_mark] = @prob[$prob_mark] + 1;
$cur = (uint64)curtask;
$fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
$g = *(uint64*)($fsbase-8);
$goid = *(uint64*)($g+GOID_OFFSET);
if (@caller_addr[$goid] == 0){
printf("%d: has no caller.
", $goid);
return;
}
$req_addr = reg("bx");
// offset(Request.URL) = 16
$url_addr = *(uint64*)($req_addr+16);
// offset(URL.Host) = 40
$host_addr = *(uint64*)($req_addr+40);
$host_len = *(uint64*)($req_addr+48);
// offset(URL.Path) = 56
$path_addr = *(uint64*)($url_addr+56);
$path_len = *(uint64*)($url_addr+64);
$c_addr = @caller_addr[$goid];
$c_len = @caller_len[$goid];
printf("caller: %s, callee: %s:%s
", str($c_addr, $c_len),
str($host_addr, $host_len), str($path_addr, $path_len));
}
|
#
四、追踪的风险
至此,看起来golang channel
是可以追踪的。但是实际上并非如此。比如如下这个示例:
1
2
3
4
5
6
7
8
9
10
| func HandleDrink() {
for {
select {
case info, ok := <-ticketChan:
...
default: // 注意这个 stop
// no stop here
}
}
}
|
这段逻辑在代码编写、编译阶段均无问题,是一段完全合理的逻辑。当我们试图追踪这段代码时:
1
2
3
4
| $ sudo bpftrace ./drink.bt
Attaching 7 probes...
^C // 直接停止,没有请求
@prob[chanrecv]: 908571
|
注意,此时并没有做任何的操作,但是这个chanrecv
这个hook
点已经触发了数十万次。而我们知道,BPF hook
点的触发并非没有开销的。因此,目标的代码在完全合理的情况下,我们的追踪程序会给系统带来很大的负载。这显然是我们需要避免的。
以上,周末愉快~