R·ex / Zeng


音游狗、安全狗、攻城狮、业余设计师、段子手、苦学日语的少年。

利用 Trie 树的变种优化带参数路由的匹配

注意:本文发布于 1805 天前,文章中的一些内容可能已经过时。

背景

我目前负责的平台需要做一个转发功能,大概的需求如下:

  1. 把前端发过来的某些 URL 直接转发到第三方系统;
  2. 需要在转发过程中添加带有用户和国家信息的 JWT header 以供第三方系统使用;
  3. 需要在我的平台上为不同 URL 做鉴权,例如 URL a 需要 rule 的读权限,URL b 需要 rate 的写权限;
  4. 第三方系统的管理员可以在我的平台上自由配置转发规则,例如把 /admin 转发到他们的 /user/admin
  5. URL 需要支持带参数,例如 /admin/:email 可以转发到 /user/admin/:email,可以保证参数名称和数量相同。

前两点都很简单,查一下 API 用一下库就可以了,3、4 两点只需要建一张表把规则写进去而不是写死在代码里也不算难,最后一点其实如果不考虑效率的话,可以这样做(使用了 Gin;代码仅为思路,不保证运行结果准确):

func matchProxy(proxy string, pathArr []string) (bool, map[string]string) {
    proxyArr := strings.Split(proxy, "/")
    if len(pathArr) != len(proxyArr) {
        return false, make(map[string]string)
    }
    params := make(map[string]string)
    for index, item := range pathArr {
        if proxyArr[index][0] == ':' {
            params[proxyArr[index]] = item
            continue
        }
        if item != proxyArr[0] {
            return false, make(map[string]string)
        }
    }
    return true, params
}

proxies := dao.GetProxies("project")
api.Any("/proxy/*path", func (c *gin.Context) {
    path := c.MustGet("path")
    pathArr := strings.Split(path, "/")
    for _, proxy := range proxies {
        if ok, params := matchProxy(proxy.Path, pathArr); ok {
            // my custom proxy function
            proxyTo(proxy.Target, params, c.Request.URL.RawQuery)
            return
        }
    }
    c.String(404, "Proxy rule not found")
})

把上面的 proxy 从结构体换成简单的 string,然后写了个性能测试:

func BenchmarkMatchProxy(b *testing.B) {
    proxies := []string{
        "/:id/a/b/c/d/e/f/g/h/i",
        "/0/a/b/c/d/e/f/g/h/i",
        "/1/a/b/c/d/e/f/g/h/i",
        "/2/a/b/c/d/e/f/g/h/i",
        "/3/a/b/c/d/e/f/g/h/i",
        "/4/a/b/c/d/e/f/g/h/i",
        "/5/a/b/c/d/e/f/g/h/i",
        "/6/a/b/c/d/e/f/g/h/i",
        "/7/a/b/c/d/e/f/g/h/i",
    }
    for i := 0; i < b.N; i++ {
        path := "/5/a/b/c/d/e/f/g/h/i"
        pathArr := strings.Split(path[1:], "/")
        for _, proxy := range proxies {
            if ok, _ := matchProxy(proxy[1:], pathArr); ok {
                // 200
                break
            }
        }
        // 404
    }
}

结果惨不忍睹:

$ go test -bench .
goos: windows
goarch: amd64
pkg: test
BenchmarkMatchProxy-8             500000              3404 ns/op
PASS
ok      _/E_/Workspace/gotest        1.940s

所以,随着转发规则越来越多(成百上千条),为了支撑起大量的 QPS,就必须要优化匹配效率了,于是就有了本文。

Trie 树的基本概念

按照以前竞赛的思维,如果 URL 不带参数的话,用 Hash 是最好的选择:即使有上万个字符串,在 Hash 中遇到大量冲突的概率也是非常低的,平均时间复杂度就是字符串的长度;而且绝大多数语言都自带了 Hash/Map,所以不用自己手写。但现在带了参数,也就只好换一个数据结构了,竞赛思维又把我导向了 Trie。

Trie 是一个专门用来查找字符串的数据结构,通过共用前缀的方式来节约存储空间,插入新串和匹配某个串的时间复杂度是该串的长度。下面就是一棵存储了 carina、card、redux、rex、react、read 六个字符串的 Trie,应该比较易懂:

    root
   /    \
  c      r
  |       \
  a        e
  |       /|\
  r      d x a
 / \    /   / \
i   d  u   c   d
|      |   |
n      x   t
|
a

如果路由不带参数,那么可以用 / 将路由 Split 开,每个元素当作一个“字符”,这样就可以用 Trie 来存了。不过这样会有个问题:这儿的“字符”是不可枚举的,因此只能用 Hash 之类的结构来存,这会导致效率进一步下降。因此如果路由不带参数,能用 Hash 就直接用 Hash 吧。

修改 Trie 的结构

由于某些原因,可能会出现 /feature/:id/change/feature/:type/batchChange 的情况,那么当用户传来一个 /feature/type1/batchChange,看到 type1 没办法知道该进哪一个分支。于是我对 Trie 的节点做了一下修改,将 Children 分成了 Route 和 Param 两个类型:

// RouteTrie is the common trie structure
// WARNING: this is not thread-safe for insertion
type RouteTrie struct {
    Data          interface{}
    Key           string
    RouteChildren map[string]*RouteTrie
    ParamChildren map[string]*RouteTrie
}

在建树的时候,区分一下当前的元素是 Param 还是普通的 Route,插入到不同的 Children 中:

arr := strings.Split(path[1:], "/")
p := current
for index, item := range arr {
    if index == len(arr)-1 {
        // the last one, insert directly
        newNode := &RouteTrie{
            Data:          data,
            Key:           item,
            RouteChildren: make(map[string]*RouteTrie),
            ParamChildren: make(map[string]*RouteTrie),
        }
        if item[0] == ':' {
            if t, ok := p.ParamChildren[item]; ok {
                t.Data = data
            } else {
                p.ParamChildren[item] = newNode
            }
        } else {
            if t, ok := p.RouteChildren[item]; ok {
                t.Data = data
            } else {
                p.RouteChildren[item] = newNode
            }
        }
    } else {
        // maybe the node not exists
        newNode := &RouteTrie{
            Data:          nil,
            Key:           item,
            RouteChildren: make(map[string]*RouteTrie),
            ParamChildren: make(map[string]*RouteTrie),
        }
        if item[0] == ':' {
            if _, ok := p.ParamChildren[item]; !ok {
                p.ParamChildren[item] = newNode
            }
            p = p.ParamChildren[item]
        } else {
            if _, ok := p.RouteChildren[item]; !ok {
                p.RouteChildren[item] = newNode
            }
            p = p.RouteChildren[item]
        }
    }
}

至于查找,就需要费一点功夫了,由于带了参数,我无法知道需要进入哪一个分支,于是我做了如下规定:

  1. 如果用 RouteChildren 可以匹配,那就匹配,所以如果有 /user/:email/user/all,传入 /user/all 会匹配到后一个;
  2. 否则,如果是最后一次匹配,直接匹配第一个 ParamChildren;
  3. 否则需要在 ParamChildren 里面做一遍搜索。

因为大多数情况下出现不同 ParamChildren 接 RouteChildren 的情况不算太多,带有分支的 Param 的位置一般也不会特别靠前,因此可以认为需要搜索的子树规模比较小,还在可以忍受的范围之内。相关代码如下:

func (current *RouteTrie) findNode(arr []string) (*RouteTrie, []string, []string) {
    item := arr[0]
    // 2. the last one
    if len(arr) == 1 {
        if target, ok := current.RouteChildren[item]; ok {
            return target, []string{}, []string{}
        }
        for _, target := range current.ParamChildren {
            return target, []string{target.Key}, []string{item}
        }
        return nil, []string{}, []string{}
    }
    // 1. use RouteChildren
    if target, ok := current.RouteChildren[item]; ok {
        return target.findNode(arr[1:])
    }
    // 3. search in ParamChildren
    for _, target := range current.ParamChildren {
        if t, keys, values := target.findNode(arr[1:]); t != nil {
            return t, append(keys, target.Key), append(values, item)
        }
    }
    return nil, []string{}, []string{}
}

返回的三个值分别是:匹配到的节点、Params 的 Key、Params 的 Value,外层函数可以通过这些信息构建出 Target URL 并转发。

如果想加速查找,最简单的解决办法就是使用非递归,并提前分配好 Keys 和 Values 的空间,但是写起来实在是不好看(也不好写),于是我放弃了。

性能测试

性能测试的代码如下:

// Use this command to perform a benchmark test:
// $ go test -bench=.

func BenchmarkAddStatic(b *testing.B) {
    trie := NewRouteTrie()
    routes := []string{"/0", "/1", "/2", "/3", "/4", "/5", "/6", "/7"}
    for i := 0; i < b.N; i++ {
        trie.Add(routes[i&7], i)
    }
}

func BenchmarkAddDynamic(b *testing.B) {
    trie := NewRouteTrie()
    for i := 0; i < b.N; i++ {
        trie.Add("/"+strconv.Itoa(i), i)
    }
}

func BenchmarkFind(b *testing.B) {
    trie := NewRouteTrie()
    trie.Add("/:id", 1)
    routes := []string{"/0", "/1", "/2", "/3", "/4", "/5", "/6", "/7"}
    for i := 0; i < b.N; i++ {
        trie.Find(routes[i&7])
    }
}

func BenchmarkFindParallel(b *testing.B) {
    trie := NewRouteTrie()
    trie.Add("/:id", 1)
    routes := []string{"/0", "/1", "/2", "/3", "/4", "/5", "/6", "/7"}
    i := 0
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            i++
            trie.Find(routes[i&7])
        }
    })
}

在我的笔记本(i7-7700HQ,Win10 x64)上的结果是这样:

$ go test -bench .
goos: windows
goarch: amd64
BenchmarkAddStatic-8             5000000               229 ns/op
BenchmarkAddDynamic-8            1000000              1587 ns/op
BenchmarkFind-8                  5000000               324 ns/op
BenchmarkFindParallel-8         10000000               102 ns/op
PASS
ok      _/E_/Workspace/gotest        6.315s

BenchmarkAddStatic 因为到了后面大部分情况下都不用重复 Add,因此时间会非常快;BenchmarkAddDynamic 由于每次都是 Add 新路由,因此它的时间才是 Add 的真实时间。不过 Add 操作只会在程序一开始使用,所以不必太关心它的时间。

虽然看起来还可以,但毕竟“字符串长度”都是 1,于是我把其中一个函数改了一下:

func BenchmarkFind(b *testing.B) {
    trie := NewRouteTrie()
    trie.Add("/:id/a/b/c/d/e/f/g/h/i", 1)
    routes := []string{
        "/0/a/b/c/d/e/f/g/h/i",
        "/1/a/b/c/d/e/f/g/h/i",
        "/2/a/b/c/d/e/f/g/h/i",
        "/3/a/b/c/d/e/f/g/h/i",
        "/4/a/b/c/d/e/f/g/h/i",
        "/5/a/b/c/d/e/f/g/h/i",
        "/6/a/b/c/d/e/f/g/h/i",
        "/7/a/b/c/d/e/f/g/h/i",
    }
    for i := 0; i < b.N; i++ {
        trie.Find(routes[i&7])
    }
}

然后结果就变成了:

BenchmarkFind-8                  2000000               688 ns/op

我……好吧其实还是可以忍受的。

扩展:对 Gin Router 的分析

Gin 是 Go 界中非常著名的路由库,不光支持 Params(:email),还支持 CatchAll(*route),功能多,性能也非常高。于是我在一开始就很好奇它是如何实现的。在结合了各种资料以及翻阅了源码之后了解到,Gin 用了一个叫 Radix Tree 的数据结构,基本可以理解为“压缩节点后的 Trie”,例如我之前构造的 Trie 可以用 Radix Tree 这样表示:

      root
     /    \
   car     re
   / \     /|\
ina   d dux x a
             / \
           ct   d

需要修改一下插入算法,在某些情况下可能需要将某一节点拆成两个。以及 Gin 的“字符”真的就是字符(Param 和 CatchAll 必须特殊处理使其单独形成一个节点),而不是我之前说的“用 / Split 之后的每一项”,因此完全可能出现这样的情况:

         root
          |
         /us
        /   \
     er/e    dt/
     /  \      \
xport    ntity  :exchange

Gin 为了提高效率,除了使用了 Radix Tree 而不是 Trie 以外,我还发现它还做了如下三点:

  1. 禁止同一前缀之后接两个不同的 Param,例如 /prefix/:a/prefix/:b,这样可以避免在树上做查找。由于不需要回溯,因此也不需要递归。可以参考 tree.go 中的 getValue 函数。
  2. 由于使用了 Radix Tree,因此对于某个节点来说,每个 Child 的首字母一定是唯一的,于是可以压缩成一个字符串(代码中叫 indices),然后维护一个 Children 数组,数组中第 i 项的首字母刚好是 indices[i]。这相当于实现了一个 O(N) 的 Map,但由于 URL 中字符集的大小最多不到 120,而且绝大部分节点下面的字符集都超级小,因此比起用一个带有大常数的 Map 来说,效率其实要高很多。
  3. 没有使用 strings.Split 而是直接在一个循环中手工提取。由于 Go 提取一个 Slice 几乎不需要时间,因此这比使用了 strings.Split 的思路常数要小一些,当然代价就是代码看起来很丑。

扩展:使用 AC 自动机是否可行

在写 ParamChildren 搜索的时候,我突然想到:既然我写了个变种的 Trie,那么如果我不用搜索,直接选 ParamChildren[0] 来匹配,失配则参考 AC 自动机中失配指针的处理方法,是否可行?

搜了一圈,网上确实有一篇使用 AC 自动机来做通配符匹配的文章(见参考资料),里面附了一个芬兰库奥皮奥大学某教授的讲课 PPT(官方链接似乎已经 404 了,不知道为什么),但我看了一圈,发现由于失配指针的构造规则跟普通 AC 自动机一样,因此只能匹配子串,而不能完全匹配。我自己也没能想到如何构造失配指针以保证完全匹配,因此只能暂时得出这样的结论:不可行。


参考资料

Disqus 加载中……如未能加载,请将 disqus.com 和 disquscdn.com 加入白名单。

这是我们共同度过的

第 3077 天