go

Go MoonMQ使用说明

在上一篇moonmq的介绍中(这里),我仅仅简短的罗列了一些moonmq的设计想法,但是对于如何使用并没有详细说明,公司同事无法很好的使用。

对于moonmq的使用,其实很简单,样例代码在这里,我们只需要处理好broker,consumer以及publisher的关系就可以了。

首先,我们需要启动一个broker,因为moonmq现在只支持tcp的自定义协议,所以broker启动的时候需要指定一个listen address。

#启动broker
./simple_broker -addr=127.0.0.1:11182

启动了broker之后,我们就可以向该broker发送消息

#向test这个queue发送 hello msg
./simple_publisher -addr=127.0.0.1:11182 -queue=test -msg=hello

然后在另一个shell里面接收消息

#接收test这个queue的消息
./simple_consumer -addr=127.0.0.1:11182 -queue=test

#output get msg: hello

如果没有消息,那么consumer就会一直等待,直到接收到消息。

这里详细说一下consumer的实现,

  • 创建一个与broker的连接

      //create a client for use
      client := NewClient(config)
    
      //get a usable connection
      conn, _ := client.Get()
    
  • 绑定queue

      //bind a queue
      //queue name : test
      //routingKey : ""
      //noAck : true
      ch, _ := conn.Bind("test", "", true)
    
  • 接收消息

      //receive msg, block to wait until a msg received
      msg := ch.GetMsg()
      println(msg)
    
  • 回执消息

      //if channel noAck is false, we must ack
      ch.Ack()
    

从上面的例子可以看出,使用moonmq很方便,后续我准备加入http的支持,使其更容易使用。

moonmq的代码在这里https://github.com/siddontang/moonmq

Go的LevelDB接口

leveldb是一个很强悍的kv数据库,自然,我也希望能在go中使用。

如果有官方的go leveldb实现,那我会优先考虑,譬如这个,但是该库文档完全没有,并且在网上没发现有人用于实战环境,对其能否在生产环境中使用打上问号,保险起见,我还是决定不使用。

因为leveldb有c的接口,所以通过cgo能很方便的进行集成,所以我决定采用该种方式,幸运的是,已经有人做了cgo的版本,也就是levigo

使用levigo,需要编译安装leveldb,如果需要压缩支持还需要编译snappy,为了简单,我写了一个构件脚本,如下:

#!/bin/bash
#refer https://github.com/norton/lets/blob/master/c_src/build_deps.sh

#你必须在这里设置实际的snappy以及leveldb源码地址
SNAPPY_SRC=./snappy
LEVELDB_SRC=./leveldb

SNAPPY_DIR=/usr/local/snappy
LEVELDB_DIR=/usr/local/leveldb

if [ ! -f $SNAPPY_DIR/lib/libsnappy.a ]; then
    (cd $SNAPPY_SRC && \
        ./configure --prefix=$SNAPPY_DIR && \
        make && \
        make install)
else
    echo "skip install snappy"
fi

if [ ! -f $LEVELDB_DIR/lib/libleveldb.a ]; then
    (cd $LEVELDB_SRC && \
        echo "echo \"PLATFORM_CFLAGS+=-I$SNAPPY_DIR/include\" >> build_config.mk" >> build_detect_platform &&
        echo "echo \"PLATFORM_CXXFLAGS+=-I$SNAPPY_DIR/include\" >> build_config.mk" >> build_detect_platform &&
        echo "echo \"PLATFORM_LDFLAGS+=-L $SNAPPY_DIR/lib -lsnappy\" >> build_config.mk" >> build_detect_platform &&
        make SNAPPY=1 && \
        make && \
        mkdir -p $LEVELDB_DIR/include/leveldb && \
        install include/leveldb/*.h $LEVELDB_DIR/include/leveldb && \
        mkdir -p $LEVELDB_DIR/lib && \
        cp -af libleveldb.* $LEVELDB_DIR/lib)
else
    echo "skip install leveldb"
fi

function add_path()
{
  # $1 path variable
  # $2 path to add
  if [ -d "$2" ] && [[ ":$1:" != *":$2:"* ]]; then
    echo "$1:$2"
  else
    echo "$1"
  fi
}

export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include"
export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy"
export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib)
export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib)

go get github.com/jmhodges/levigo 

对于leveldb在go里面的使用,levigo做了很好的封装,但是有一点我不怎么习惯,在leveldb中,对于read和write的操作,都需要传入一个Option的东西,这玩意大多数时候都是一个默认Option对象,没必要每次在go里面进行创建删除。所以我对其进行了封装,提供了如下的接口,这样使用的都是默认的option。

func (db *DB) Put(key, value []byte) error 
func (db *DB) Get(key []byte) ([]byte, error)
func (db *DB) Delete(key []byte) error 

同时对于iterator,我参考c++的模型,提供了iterator以及reverse_iterator两种模式,如下:

func (db *DB) Iterator(begin []byte, end []byte, limit int) *Iterator 
func (db *DB) ReverseIterator(rbegin []byte, rend []byte, limit int) *Iterator 

具体的代码在这里

Go使用pprof调试Goroutine

有一段时间,我们的推送服务socket占用很不正常,我们自己统计的同时在线就10w的用户,但是占用的socket竟然达到30w,然后查看goroutine的数量,发现已经60w+。

每个用户占用一个socket,而一个socket,有read和write两个goroutine,简化的代码如下:

c, _ := listerner.Accept()

go c.run()

func (c *conn) run() {
    go c.onWrite()
    c.onRead()
}

func (c *conn) onRead() {
    stat.AddConnCount(1)

    //on something

    stat.AddConnCount(-1)

    //clear
    //notify onWrite to quit
}

当时我就怀疑,用户同时在线的统计是正确的,也就是之后的clear阶段出现了问题,导致两个goroutine都无法正常结束。在检查代码之后,我们发现了一个可疑的地方,因为我们不光有自己的统计,还会将一些统计信息发送到我们公司的统计平台,代码如下:

ch = make([]byte, 100000)
func send(msg []byte) {
    ch <- msg
}

//在另一个goroutine的地方,
msg <- msg
httpsend(msg)

我们channel的缓存分配了10w,如果公司统计平台出现了问题,可能会导致channel阻塞。但到底是不是这个原因呢?

幸运的是,我们先前已经在代码里面内置了pprof的功能,通过pprof goroutine的信息,发现大量的goroutine的当前运行函数在httpsend里面,也就是说,公司的统计平台在大并发下面服务不可用,虽然我们有http超时的处理,但是因为发送的数据量太频繁,导致整体阻塞。

临时的解决办法就是关闭了统计信息的发送,后续我们会考虑将其发送到自己的mq上面,虽然也可能会出现mq服务不可用的问题,但是说句实话,比起自己实现的mq,公司的统计平台更让我不可信。

这同时也给了我一个教训,访问外部服务一定要好好处理外部服务不可用的问题,即使可用,也要考虑压力问题。

对于pprof如何查看了goroutine的问题,可以通过一个简单的例子说明:

package main

import (
    "net/http"
    "runtime/pprof"
)

var quit chan struct{} = make(chan struct{})

func f() {
    <-quit
}

func handler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/plain")

    p := pprof.Lookup("goroutine")
    p.WriteTo(w, 1)
}

func main() {
    for i := 0; i < 10000; i++ {
        go f()
    }

    http.HandleFunc("/", handler)
    http.ListenAndServe(":11181", nil)
}

这上面的例子中,我们启动了10000个goroutine,并阻塞,然后通过访问http://localhost:11181/,我们就可以得到整个goroutine的信息,仅列出关键信息:

goroutine profile: total 10004

10000 @ 0x186f6 0x616b 0x6298 0x2033 0x188c0
#    0x2033    main.f+0x33    /Users/siddontang/test/pprof.go:11

可以看到,在main.f这个函数中,有10000个goroutine正在执行,符合我们的预期。

在go里面,还有很多运行时查看机制,可以很方便的帮我们定位程序问题,不得不赞一下。

Go Timer实现

在go自带的timer实现中,采用的是通常的最小堆的方式,具体可以参见这里

最小堆能够提供很好的定时精度,但是,在实际情况中,我们并不需要这样高精度的定时器,譬如对于一个连接,如果它在2分钟以内没有数据交互,我们就将其删除,2分钟并不需要那么精确,多几秒少几秒都无所谓的。

以前我们单独实现了一个timingwheel,采用的是channel close的方式来处理低精度,超大量timer定时的问题,详见这里

但是timingwheel只有After接口,远远不能满足实际的需求,于是我按照linux timer的实现方式,依葫芦画瓢,弄了一个go版本的实现。linux timer的实现,参考这篇

后续用go timer来表示我自己实现的timer。

在linux中,我们使用tick来表示一次中断的时间,用jiffies来表示系统自启动以来流逝的tick次数。在go timer中,我们在创建一个wheel的时候,需要指定一次tick的时间,如下:

func NewWheel(tick time.Duration) *Wheel

Wheel是go timer统一对timer进行管理的地方。对于每一次tick,我们采用go自带的ticker进行模拟。

为了便于外部使用,我仍然提供的是跟go自己timer一样的接口,譬如:

func NewTimer(d time.Duration) *Timer

在NewTimer中,参数d是一个time duration,我们还需要根据tick来进行换算,得到go timer中实际的expires,也就是在多少次jiffies后该timer触发。

譬如,NewTimer参数为10s,tick为1s,那么经过10个jiffies之后,该timer就会超时触发。如果tick为500ms,那么需要经过20个jiffies之后,该timer才会被触发。

所以timer超时jiffies的计算如下:

expires = wheel.jiffies + d / wheel.tick

详细的代码在https://github.com/siddontang/golib/tree/master/timer

在Go中使用JSON作为主要配置

最近在用go重构,在先前的代码中,我们使用的ini文件进行配置,但是因为很多历史遗留问题,导致配置混乱,维护困难,自然也需要考虑重构了。

通用配置格式

通用的配置格式有很多,常用的就有ini,json,yaml,xml等,当然为了通用我们不考虑自定义的配置格式。那如何选择呢?

首先,xml我们就不用考虑了,到现在为止我都没觉得用这玩意配置起来有多方便,反而很臃肿,可能java系的童鞋会比较青睐。

再来考虑ini,ini文件对于简单应用的配置可以说是非常方便的,如果配置没有太多的层次结构,使用ini就能完全满足我们的需要,即使有,我们也能够通过加入特定前缀来解决。譬如,我们可能有如下redis配置:

[ModuleA]

persistent_redis_addr = 127.0.0.1:6379
persistent_redis_password = admin

cache_redis_addr = 127.0.0.1:6380
cache_redis_password = admin

然后,需求变化了,我们需要另一个持久化redis服务来做相关事情,于是配置就可能变成了下面这样:

[ModuleA]

persistent_redis_addr = 127.0.0.1:6379
persistent_redis_password = admin

persistent2_redis_addr = 127.0.0.1:6379
persistent2_redis_password = admin

cache_redis_addr = 127.0.0.1:6380
cache_redis_password = admin

虽然通过前缀命名能解决层级问题,但总觉得是对程序员不怎么友好的。

why json

剩下的就是json和yaml,可以这么说,这两个都算是比较好的轻量级配置格式,而且对程序员非常友好,而且go里面都可以通过定义struct,将层级结构的配置映射到相应的struct里面。

但是我还是决定选择json作为我们go代码的默认配置格式,最主要的原因在于go的json包有一个杀手级别的RawMessage实现。而这个在我能找到的yaml包中没有。

RawMessage主要是用来告诉go延迟解析用的。当我们定义了某一个字段为RawMessage之后,go就不会解析这段json,这样我们就可以将其推后到各自的子模块单独解析。

假设有一个功能,后台存储可能是redis或者mysql,但是只会使用一个,可能我们会按照如下方式写配置:

redis_store : {
    addr : 127.0.0.1
    db : 0
},

mysql_store : {
    addr : 127.0.0.1
    db : test
    password : admin
    user : root
}

store : redis

对应的class为

type Config struct {
    RedisStore struct {
        Addr string
        DB int
    }

    MysqlStore Struct {
        Addr string
        DB string
        Password string
        User string
    }

    Store string
}

如果这时候我们在增加了一种新的store,我们需要在Config文件里面在增加一个新的field,但是实际我们只会使用一种store,并不需要写这么多的配置。

我们可以使用RawMessage来处理:

type Config struct {
    Store string
    StoreConfig json.RawMessage
}

如果使用redis,对应的配置文件为

store: redis
store_config: {
    addr : 127.0.0.1
    db : 0
}

如果使用mysql,对应的配置文件为

store: mysql
store_config: {
    addr : 127.0.0.1
    db : test
    password : admin
    user : root
}

go读取配置文件之后,并不会处理RawMessage对应的东西,而是由我们代码自己对应的store模块去处理。这样无论配置文件怎么变动,store模块做了什么变动,都不会影响Config类。

而在各个模块中,我们只需要自己定义相关config,然后可以将RawMessage直接解析映射到该config上面,譬如,对于redis,我们在模块中有如下定义

type RedisConfig config {
    Addr string `json:"addr"`
    DB int `json:"db"`
}

func NewConfig(m json.RawMessage) *RedisConfig {
    c := new(RedisConfig)

    json.Unmarshal(m, c)

    return c
}

json的不足

使用json也还有很多蛋疼的地方,最大的问题就在于注释,在json中,可不能这样写:

{
    //this is a comment
    /*this is a comment*/ 
}

但是,我们又不可能不写一点注释来说明配置项是干啥的,所以,通常采用的是引入一个comment字段的方式,譬如:

{
    "_comment" : "this is a comment",
    "key" : "value"
}

另外,json还需要注意的就是写的时候最后一项不能加上逗号,这样的json会因为格式错误无法解析的。

{
    "key" : "value",
}

最后那个逗号可是不能要的,但是实际写配置的时候我们可是经常性的随手加上了。

但是,总的来说,json对于go的项目还是很友好的,我不光在项目中推行了,在自己的开源项目中,也大量的采用了json作为主要的配置文件。

Go中Slice与String的无内存copy

在go里面,string和slice的互换是需要进行内存拷贝的,虽然在底层,它们都只是用 pointer + len来表示的一段内存。

通常,我们不会在意string和slice的转换带来的内存拷贝性能问题,但是总有些地方需要关注的,刚好在看vitess代码的时候,发现了一种很hack的做法,string和slice的转换只需要拷贝底层的指针,而不是内存拷贝。当然这样做的风险各位就要好好担当了:

func String(b []byte) (s string) {
    pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
    pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
    pstring.Data = pbytes.Data
    pstring.Len = pbytes.Len
    return
}

func Slice(s string) (b []byte) {
    pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
    pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
    pbytes.Data = pstring.Data
    pbytes.Len = pstring.Len
    pbytes.Cap = pstring.Len
    return
}

在我的测试例子中,slice转string之后,如果slice的值有变化,string也会跟着改变,如下:

b := []byte("hello world")

a := String(b)

b[0] = 'a'

println(a)  //output  aello world

但是string转slice之后,就不能更改slice了,如下:

a := "hello world"

b := Slice(a)

b[0] = 'a'  //这里就等着崩溃吧

//但是可以这样,因为go又重新给b分配了内存
b = append(b, "hello world"…)

上面为什么会崩溃我猜想可能是string是immutable的,可能对应的内存地址也是不允许改动的。

另外,上面这个崩溃在defer里面是recover不回来的,真的就崩溃了,原因可能就跟c的非法内存访问一样,os不跟你玩了。

Go Log模块开发

在go里面,虽然有log模块,但是该模块提供的功能并不强,譬如就没有我们常用的level log功能,但是自己实现一个log模块也并不困难。

对于log的level,我们定义如下:

const (
    LevelTrace = iota
    LevelDebug
    LevelInfo
    LevelWarn
    LevelError
    LevelFatal
)    

相应的,提供如下几个函数:

func Trace(format string, v ...interface{}) 
func Debug(format string, v ...interface{}) 
func Info(format string, v ...interface{}) 
func Warn(format string, v ...interface{}) 
func Error(format string, v ...interface{}) 
func Fatal(format string, v ...interface{}) 

另外,对于一个log,我们可能将其写入不同的地方,譬如可能直接输出到stdout,或者写文件,或者写socket,在创建特定logger的时候,我们需要指定对应的handler,用来实现不同的写入方式。

handler的定义如下:

type Handler interface {
    Write(p []byte) (n int, err error)
    Close() error
}

我们可以参考python的log模块,实现几个通用的handler,譬如StreamHandler,FileHandler,TimeRotatingFileHandler。

log模块的实现https://github.com/siddontang/golib/tree/master/log

MoonMQ 高性能消息队列介绍

介绍

moonmq是一个用go实现的高性能消息队列系统,后续准备用于我们消息推送服务以及各个后台的异步任务。

在设计上面,moonmq主要借鉴了rabbitmq以及rocketmq相关的思想,但是做了很多减法,毕竟我不是要设计成一个非常通用的mq。

名词解释

  • publisher,消息生产者
  • consumer,消息消费者
  • broker,消息中转站
  • queue,消息存储队列

publisher给一个命名的queue发送消息msg,broker负责将msg存放在queue里面。

consumer可以关注自己感兴趣的queue,这样当queue里面有消息的时候,broker就会将该消息推送给该consumer。

推拉模型

在rocketmq里面,支持的是pull msg,而rabbitmq则是支持push和pull msg。moonmq只支持push msg。主要有如下考量:

  • 当consumer在线的时候,push是最及时的,因为这时候铁定能把msg push成功。
  • 当consumer离线,broker会保存离线消息,当consumer上线之后,broker仍然按照push的方式将离线消息进行推送。

另外,因为moonmq后续会支持我们的消息推送系统,如果采用pull模型,几十万的consumer定期的pull,我有点担心moonmq会吃不消。

消息类型

moonmq将msg分为direct和fanout,fanout就是广播消息,moonmq会将任何订阅了该queue的consumer进行msg push。

如果msg的type为direct,moonmq将会采用轮询的方式,选择一个consumer进行msg push。

消息优先级

moonmq不支持消息优先级,处理起来会很麻烦,而且通常我们并不需要特别精细的优先级控制。

可以通过一个简单的方式实现粗粒度的优先级控制:

  • 设置queue1,queue2,queue3三个队列,queue1用来处理保存优先级最高的消息,queue2次之,queue3最低
  • publisher发送消息的时候根据优先级发送到指定的queue上面去
  • 我们可以有多个consumer处理queue1的消息,譬如3个,然后用2个处理queue2的,1个处理queue1的,这样实现优先级控制。

消息过滤

moonmq通过routing key来进行消息过滤。

publisher在给特定queue发送msg的时候,还可以指定对应的routing key,只有关注了该queue同时也指定了相同的routing key的consumer才会收到该msg。

Ack

moonmq支持ack机制,当push一个msg到consumer的时候,consumer必须回应一个ack,moonmq才认为msg push成功。如果长时间没有ack,则moonmq会重新选择一个consumer再次发送。

ack能够很大程度的保证消息推送的成功率,但是对于消息的快速推送会有影响,所以moonmq也支持no ack模式,这种模式下moonmq只要发送成功了msg,就认为push成功,无需等待ack的回执。

延迟消息

这个现在还没支持,后续是情况而定

定时消息

难度比较大,不会实现

协议

moonmq采用的是类似rocketmq的协议,如下:

|total length(4 bytes)|header length(4 bytes)|header json|body|

total length = 4 + len(header json) + len(body)
header length = len(header json)

在moonmq里面,我们使用Proto来定义协议

type Proto struct {
    Method uint32 `json:"method"`

    Fields map[string]string `json:"fields"`

    Body []byte `json:"-"`
}

moonmq的任何协议,都需要带上method,我们通过method进行实际的消息处理。

moonmq的method参考rabbitmq,有如下几种类型的method:

  • 同步request method,客户端在发送request method之后必须等待对应的response method,在等待的过程中也能够处理push,error等异步method。
  • 同步response method,对应特定的request method。
  • 异步method,发送之后无需等待。

现阶段,moonmq支持如下同步method:

  • auth, auth_ok
  • publish, publish_o
  • bind, bind_ok
  • unbind, unbind_ok

同时支持如下异步method:

  • push
  • error
  • heartbeat
  • ack

后续

这只是moonmq的一个简单介绍,后续我们会不断完善moonmq,争取也能成为一个不错的mq产品。

moonmq的代码在这里https://github.com/siddontang/moonmq,期待大家的反馈。

在go中使用linked channels进行数据广播

原文在这里(需翻墙),为啥想要翻译这篇文章是因为在实际中也碰到过如此的问题,而该文章的解决方式很巧妙,希望对大家有用。

在go中channels是一个很强大的东西,但是在处理某些事情上面还是有局限的。其中之一就是一对多的通信。channels在多个writer,一个reader的模型下面工作的很好,但是却不能很容易的处理多个reader等待获取一个writer发送的数据的情况。

处理这样的情况,可能的一个go api原型如下:

type Broadcaster …

func NewBroadcaster() Broadcaster
func (b Broadcaster) Write(v interface{})
func (b Broadcaster) Listen() chan interface{}

broadcast channel通过NewBroadcaster创建,通过Write函数进行数据广播。为了监听这个channel的信息,我们使用Listen,该函数返回一个新的channel去接受Write发送的数据。

这套解决方案需要一个中间process用来处理所有reader的注册。当调用Listen创建新的channel之后,该channel就被注册,通常该中间process的主循环如下:

for {
    select {
        case v := <-inc:
            for _, c := range(listeners) {
                c <- v
            }
        case c := <- registeryc:
            listeners.push(c)
    }
}

这是一个通常的做法,(译者也经常这么干)。但是该process在处理数据广播的时候会阻塞,直到所有的readers读取到值。一个可选的解决方式就是reader的channel是有buffer缓冲的,缓冲大小我们可以按需调节。或者当buffer满的时候我们将数据丢弃。

但是这篇blog并不是介绍上面这套做法的。这篇blog主要提出了另一种实现方式用来实现writer永远不阻塞,一个慢的reader并不会因为writer发送数据太快而要考虑分配太大的buffer。

虽然这么做不会有太高的性能,但是我并不在意,因为我觉得它很酷。我相信我会找到一个很好的使用地方的。

首先是核心的东西:

type broadcast struct {
    c chan broadcast
    v interface{}
}

这就是我说的linked channel,但是其实是Ouroboros data structure。也就是,这个struct实例在发送到channel的时候包含了自己。

从另一方面来说,如果我有一个chan broadcast类型的数据,那么我就能从中读取一个broadcast b,b.v就是writer发送的任意数据,而b.c,这个原先的chan broadcast,则能够让我重复这个过程。

另一个可能让人困惑的地方在于一个带有缓冲区的channel能够被用来当做一个1对多广播的对象。如果我定义如下的buffered channel:

var c = make(chan T, 1)

任何试图读取c的process都将阻塞直到有数据写入。

当我们想广播一个数据的时候,我们只是简单的将其写入这个channel,这个值只会被一个reader给获取,但是我们约定,只要读取到了数据,我们立刻将其再次放入该channel,如下:

func wait(c chan T) T {
    v := <-c
    c <-v
    return v
}

结合上面两个讨论的东西,我们就能够发现如果在broadcast struct里面的channel如果能够按照上面的方式进行处理,我们就能实现一个数据广播。

代码如下:

package broadcast

type broadcast struct {
    c   chan broadcast;
    v   interface{};
}

type Broadcaster struct {
    // private fields:
    Listenc chan chan (chan broadcast);
    Sendc   chan<- interface{};
}

type Receiver struct {
    // private fields:
    C chan broadcast;
}

// create a new broadcaster object.
func NewBroadcaster() Broadcaster {
    listenc := make(chan (chan (chan broadcast)));
    sendc := make(chan interface{});
    go func() {
        currc := make(chan broadcast, 1);
        for {
            select {
            case v := <-sendc:
                if v == nil {
                    currc <- broadcast{};
                    return;
                }
                c := make(chan broadcast, 1);
                b := broadcast{c: c, v: v};
                currc <- b;
                currc = c;
            case r := <-listenc:
                r <- currc
            }
        }
    }();
    return Broadcaster{
        Listenc: listenc,
        Sendc: sendc,
    };
}

// start listening to the broadcasts.
func (b Broadcaster) Listen() Receiver {
    c := make(chan chan broadcast, 0);
    b.Listenc <- c;
    return Receiver{<-c};
}

// broadcast a value to all listeners.
func (b Broadcaster) Write(v interface{})   { b.Sendc <- v }

// read a value that has been broadcast,
// waiting until one is available if necessary.
func (r *Receiver) Read() interface{} {
    b := <-r.C;
    v := b.v;
    r.C <- b;
    r.C = b.c;
    return v;
}

下面就是译者的东西了,这套方式实现的很巧妙,首先它解决了reader register以及unregister的问题。其次,我觉得它很好的使用了流式化处理的方式,当reader读取到了一个值,reader可以将其传递给下一个reader继续使用,同时自己在开始监听下一个新的值的到来。

译者自己的一个测试用例:

func TestBroadcast(t *testing.T) {
    b := NewBroadcaster()

    r := b.Listen()

    b.Write("hello")

    if r.Read().(string) != "hello" {
        t.Fatal("error string")
    }

    r1 := b.Listen()

    b.Write(123)

    if r.Read().(int) != 123 {
        t.Fatal("error int")
    }

    if r1.Read().(int) != 123 {
        t.Fatal("error int")
    }

    b.Write(nil)

    if r.Read() != nil {
        t.Fatal("error nit")
    }

    if r1.Read() != nil {
        t.Fatal("error nit")
    }
}

当然,这套方式还有点不足,主要就在于Receiver Read函数,并不能很好的与select进行整合,具体可以参考该作者另一篇blog http://rogpeppe.wordpress.com/2010/01/04/select-functions-for-go/

Mixer MySQL词法分析

介绍

mixer希望在proxy这层就提供自定义路由,sql黑名单,防止sql注入攻击等功能,而这些的基石就在于将用户发上来的sql语句进行解析。也就是我最头大的词法分析和语法分析。

到现在为止,我只是实现了一个比较简单的词法分析器,用以将sql语句分解成多个token。而对于从token在进行语法分析,构建sql的AST,我现在还真没啥经验(编译原理太差了),急需牛人帮忙。

所以,这里只是简单介绍一下mixer的词法分析。

tokenize

在很多地方,我们都需要进行词法分析,通常会有几种方式:

  • 使用一个强大的工具,譬如lex,mysql-proxy就用的这种方式
  • 使用正则表达式
  • state machine

对于使用工具,我觉得有一个不怎么好的地方在于学习成本,譬如我用lex的时候就需要学习它的语法,同时通过工具生成的代码可读性都不怎么好,代码量大,更严重的是可能会比较慢。所以mysql自身也是自己实现一个词法分析模块。

而对于正则表达式,性能问题可能是一个很需要考虑的,而且复杂度并不比使用类似lex这样的工具低。

状态机可能是我觉得自己动手实现词法解析一个很好的方式,对于sql的词法解析,我觉得使用state machine的方式来自己写一个难度并不大,所以mixer自己实现了一个。

state machine

通常,一个状态机的实现采用的是state + action + switch的做法,可能如下:

switch state {
    case state1:
        state = action1()
    case state2:
        state = action2()
    case state3:
        state = action3()
}

对于一个state,我们通过switch知道它将会由哪一个action进行处理,而对于每一个action,我们则知道执行完成之后下一个state是什么。

对于上面的实现,如果state过多,可能会导致太多的case语句,我们可以通过state function进行简化。

一个state function就是执行当前的state action,并且直接返回下一个state function。

我们可以这样做:

type stateFn func(*Lexer) stateFn

for state := startState; state != nil {
    state = state(lexer)
}

所以我们需要实现的就是每一个state function以及对应的它的下一个需要执行的state function。

mixer lexer

mixer的词法分析实现主要参考这个。主要实现在parser模块

对于一个lexer,需要提供的是NextToken的功能,供外部获取下一个token,从而进行后续的操作(譬如语法分析)。

lexer的next token如下:

func (l *Lexer) NextToken() (Token, error) {
    for {
        select {
            case t := <-l.tokens:
                return t, nil
            default:
                if l.state == nil {
                    return Token{TK_EOF, ""}, l.err
                }
                l.state = l.state(l)
                if l.err != nil {
                    return Token{TK_UNKNOWN, ""}, l.err
                }
        }
    }
}

tokens是一个channel,每次state解析的token都会emit到这个channel上面,供NextToken获取,如果channel为空了,则再次调用state function。

可以看到,用go实现一个词法解析是很容易的事情,剩下的就是写相应的state function用来解析sql。

todo

mixer的词法分析还有很多不完善的地方,譬如对于科学计数法数值的解析就不完善,后续准备参考mysql官方的词法分析模块在好好完善一下。

mixer的代码在这里https://github.com/siddontang/mixer,希望感兴趣的童鞋共同完善。