如何追踪golang channel?

·4min·李岩

2023年就要结束了,算起来距离上一次更新也有很久了。搜肠刮肚,总得在23年结束前再搞两篇总结,算是有始有终。总结今年,总还是绕不过 BPF,golang。既然如此,就对BPF观测golang这个话题再往下挖掘下,先做第一篇文章。下旬如果有时间并且顺利的话,希望能把BPF的原理总结完成。

无侵入观测服务拓扑四元组的一种实现中,笔者有提到追踪golang处理过程的两个无法解决的问题是golang里的channel处理以及goroutine pool。再深究下,这两个问题实际上都可以归纳到对channel的处理,因为很多goroutine pool都离不了channel的使用,比如Jeffail/tunny这个库。
本文将会构建一个channel的追踪的方案。

一、追踪效果

按照惯例,我们还是来看下效果。

// terminal 1,启动服务
$ ./drink-srv

// terminal 2,启动追踪脚本
$ sudo bpftrace ./drink.bt
Attaching 7 probes...  // 启动后停止在这里
serve HTTP: /alcohol   // 触发接口后输出
caller: /alcohol, callee: :/unknown/prepare/hotel
serve HTTP: /tea
caller: /tea, callee: :/unknown/prepare/club

// terminal 3,触发服务接口
$ curl "localhost:1423/alcohol?age=22"
$ curl "localhost:1423/tea?age=12"

二、方案设计

关于golang channel的实现及设计,可以参见图解Go的channel底层实现,里面有非常生动的动图实现;搭配源码食用更好runtime/chan.go
笔者在这里再简单的总结下,对sendrecv两种操作设计的状态做一个简单的概述: chan-send的状态: overwrote existing file chan-recv的状态: overwrote existing file

比如,对于下面的代码,派生出的g1在开启select后,由于ticketChan是空的,会触发g1让出m里的执行权限,进入gopark状态。同时,ticketChan会将g1封装成sudog,放到recvq队列中。当一段时间之后,其他的g将数据写入channel里时,会在chansend时,检查到recvq不为空,会直接将数据拷贝到空闲的sudog中。

var ticketChan = make(chan TicketInfo, 10)
func HandleDrink() {
    for {
        select {
        case info, ok := <-ticketChan:
			...
            }
        }
    }
}
func main() {
    go HandleDrink()
    ...
}

chanrecv进入recvq对应的golang处理逻辑在这里:

// runtime/chan.go
	...
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
    ...
    

chansend直接将数据拷贝到recvq对应的golang处理逻辑在这里:

// runtime/chan.go
	...
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    ...
    

三、方案实现

了解了channel的处理流程,追踪的方案就比较明确了,直接在关键的函数处设置hook点即可。先来看下目标服务:

package main

import (
	"log"
	"net/http"
	"strconv"

	"github.com/gin-gonic/gin"
)

const (
	ALCOHOL = iota + 1001
	COCO
	COFFEE
	TEA
)

type TicketInfo struct {
	Age  int
	Name string
	Type int
}

var ticketChan = make(chan TicketInfo, 10)

func AlcoholH(c *gin.Context) {
	var ticket = TicketInfo{}
	var err error
	ticket.Age, err = strconv.Atoi(c.Query("age"))
	if err != nil {
		c.String(http.StatusOK, "handle failed")
		return
	}
	ticket.Name = c.Query("name")
	ticket.Type = ALCOHOL
	ticketChan <- ticket
	return
}

func TeaH(c *gin.Context) {
	var ticket = TicketInfo{}
	var err error
	ticket.Age, err = strconv.Atoi(c.Query("age"))
	if err != nil {
		log.Println("handle failed, ", err.Error())
		c.String(http.StatusOK, "handle failed")
		return
	}
	ticket.Name = c.Query("name")
	ticket.Type = TEA
	ticketChan <- ticket
	c.String(http.StatusOK, "okay")
	return
}

func HandleDrink() {
	for {
		select {
		case info, ok := <-ticketChan:
			if !ok {
				log.Println("chan closed.")
				return
			}
			log.Println("get ticket")
			switch info.Type {
			case ALCOHOL:
				Alcohol(info)
			case COCO:
				SoftDrink(info)
			case COFFEE, TEA:
				Tea(info)
			default:
				log.Println("unknown drink type")
			}
		}
	}
}

func Alcohol(ticket TicketInfo) {
	var url = "http://localhost/unknown/prepare/hotel"
	http.DefaultClient.Get(url)
	return
}

func SoftDrink(ticket TicketInfo) {
	log.Printf("[%s, %d] drink %d", ticket.Name, ticket.Age, ticket.Type)
	return
}

func Tea(ticket TicketInfo) {
	var url = "http://localhost/unknown/prepare/club"
	http.DefaultClient.Get(url)
	return
}

func main() {
	defer func() {
		close(ticketChan)
	}()

	go HandleDrink()

	var r = gin.Default()
	r.GET("/alcohol", AlcoholH)
	r.GET("/tea", TeaH)

	var srv = &http.Server{
		Addr:    "127.0.0.1:1423",
		Handler: r,
	}
	if srv.ListenAndServe() != nil {
		log.Println("failed to handle service listen")
		return
	}
}

对于这样的一个服务,希望达到示例中的追踪效果,对应的方案为:

#define OFF_TASK_THRD 4992
#define OFF_THRD_FSBASE 40
#define GOID_OFFSET 152

uprobe:./drink-srv:"runtime.runqput"
{
  $prob_mark = "runqput";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  if (@new_go[tid, pid] == 0){
    return;
  }
  $p_goid = @new_go[tid, pid];

  $g = (uint64)(reg("bx"));
  $goid = *(uint64*)($g+GOID_OFFSET);
  @caller_addr[$goid] = @caller_addr[$p_goid];
  @caller_len[$goid]  = @caller_len[$p_goid];
}

uprobe:./drink-srv:"runtime.newproc1"
{
  $prob_mark = "newproc1";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $g = (uint64)(reg("bx"));
  $goid = *(uint64*)($g+GOID_OFFSET);
  if (@caller_addr[$goid] == 0){
    return;
  }
  @new_go[tid, pid] = $goid;
}

// 这里,将 caller 信息和写入 channel 信息的 key 关联起来
uprobe:./drink-srv:"runtime.chansend"
{
  $prob_mark = "chansend";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  // 如果当前执行goroutine中没有caller,跳过
  if(@caller_addr[$goid] == 0){
    return;
  }

  $chan = (uint64)reg("ax");
  $qcount = *(uint32*)($chan + 0);
  $buf = *(uint64*)($chan+16);
  @send_addr[$chan, $qcount] = @caller_addr[$goid];
  @send_len[$chan, $qcount]  = @caller_len[$goid];

  return;
}


uprobe:./drink-srv:"runtime.chanrecv"
{
  $prob_mark = "chanrecv";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  $chan = (uint64)reg("ax");
  $qcount = *(uint32*)($chan + 0);
  $buf = *(uint64*)($chan+16);
  if (@send_addr[$chan, $qcount] == 0){
    return;
  }
  @caller_addr[$goid] = @send_addr[$chan, $qcount];
  @caller_len[$goid]  = @send_len[$chan, $qcount];

  return;
}

uprobe:./drink-srv:"runtime.send"
{
  $prob_mark = "send";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $chan = (uint64)reg("ax");
  $sg = (uint64)reg("bx");
  $g = *(uint64*)($sg+0);
  $goid = *(uint64*)($g+GOID_OFFSET);

  $qcount = *(uint32*)($chan+0);

  @caller_addr[$goid] = @send_addr[$chan, $qcount];
  @caller_len[$goid]  = @send_len[$chan, $qcount];

  return;
}


/*
type serverHandler struct {
	srv *Server
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
*/
uprobe:./drink-srv:"net/http.serverHandler.ServeHTTP"
{
  $prob_mark = "ServeHTTP";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  $req_addr = reg("di");
  // offset(Request.URL) = 16
  $url_addr = *(uint64*)($req_addr+16);
  // offset(URL.Path) = 56
  $path_addr = *(uint64*)($url_addr+56);
  $path_len  = *(uint64*)($url_addr+64);

  @caller_addr[$goid] = $path_addr;
  @caller_len[$goid]  = $path_len;

  printf("serve HTTP: %s
", str($path_addr, $path_len));

  return;
}

/*
func (c *Client) do(req *Request) (retres *Response, reterr error) {
*/
uprobe:./drink-srv:"net/http.(*Client).do"
{
  $prob_mark = "do";
  @prob[$prob_mark] = @prob[$prob_mark] + 1;

  $cur = (uint64)curtask;
  $fsbase = *(uint64*)($cur+OFF_TASK_THRD+OFF_THRD_FSBASE);
  $g = *(uint64*)($fsbase-8);
  $goid = *(uint64*)($g+GOID_OFFSET);

  if (@caller_addr[$goid] == 0){
    printf("%d: has no caller.
", $goid);
    return;
  }

  $req_addr = reg("bx");
  // offset(Request.URL) = 16
  $url_addr = *(uint64*)($req_addr+16);
  // offset(URL.Host) = 40
  $host_addr = *(uint64*)($req_addr+40);
  $host_len  = *(uint64*)($req_addr+48);
  // offset(URL.Path) = 56
  $path_addr = *(uint64*)($url_addr+56);
  $path_len  = *(uint64*)($url_addr+64);

  $c_addr = @caller_addr[$goid];
  $c_len  = @caller_len[$goid];

  printf("caller: %s, callee: %s:%s
", str($c_addr, $c_len),
    str($host_addr, $host_len), str($path_addr, $path_len));
}

四、追踪的风险

至此,看起来golang channel是可以追踪的。但是实际上并非如此。比如如下这个示例:

func HandleDrink() {
    for {
        select {
        case info, ok := <-ticketChan:
			...
        default: // 注意这个 stop
        	// no stop here
        }
    }
}

这段逻辑在代码编写、编译阶段均无问题,是一段完全合理的逻辑。当我们试图追踪这段代码时:

$ sudo bpftrace ./drink.bt
Attaching 7 probes...
^C  // 直接停止,没有请求
@prob[chanrecv]: 908571

注意,此时并没有做任何的操作,但是这个chanrecv这个hook点已经触发了数十万次。而我们知道,BPF hook点的触发并非没有开销的。因此,目标的代码在完全合理的情况下,我们的追踪程序会给系统带来很大的负载。这显然是我们需要避免的。

以上,周末愉快~