libtnet

Libtnet 百万连接测试

最近在用go语言做一个挂载大量长连接的推送服务器,虽然已经完成,但是内存占用情况让我不怎么满意,于是考虑使用libtnet来重新实现一个。后续我会使用comet来表明推送服务器。

对于comet来说,单机能支撑大量的并发连接,是最优先考虑的事项。虽然现在业界已经有了很多数据,说单机支撑200w,300w,但我还是先把目标定在100w上面,主要的原因在于实际运行中,comet还会有少量逻辑功能,我得保证在单机挂载100w的基础上,完全能无压力的处理这些逻辑。

CometServer Test

首先我用libtnet简单写了一个comet server。它接受http请求,并将其挂起,过一段随机时间之后在返回200。

void onTimeout(const TimingWheelPtr_t& wheel, const WeakHttpConnectionPtr_t& conn)
{
    HttpConnectionPtr_t c = conn.lock();
    if(c)
    {
        c->send(200);
    } 
}

void onHandler(const HttpConnectionPtr_t& conn, const HttpRequest& request)
{
    int timeout = random() % 60 + 30;
    comet.wheel->add(std::bind(&onTimeout, _1, WeakHttpConnectionPtr_t(conn)), timeout * 1000);
}

int main()
{
    TcpServer s;        
    s.setRunCallback(std::bind(&onServerRun, _1));
    HttpServer httpd(&s);
    httpd.setHttpCallback("/", std::bind(&onHandler, _1, _2));
    httpd.listen(Address(11181));
    s.start(8);
    return 0; 
}

可以看到comet server只是负责了挂载长连接的事情,而没有消息的推送。在实际项目中,我已经将挂载连接和推送消息分开到两个服务去完成。所以这里comet仅仅是挂载连接测试。

测试机器准备

因为linux系统上面一个网卡tcp连接端口数量是有限制的,我们调整ip_local_port_range使其能支撑60000个tcp连接:

net.ipv4.ip_local_port_range = 1024 65535

对于100w连接来说,我们至少需要16台机器,但实际我只有可怜的3台4G内存的虚拟机。所以就要运维的童鞋在每台机器上面装了6块网卡。这样我就能建立100w的连接了。

测试客户端也非常简单,每秒向服务器请求1000个连接,但是需要注意的是,因为一台机器上面有多块网卡,所以在创建socket之后,我们需要将socket绑定到某一块网卡上面。

实际测试中,因为内存问题,每台机器顶多能支撑34w左右的tcp连接,对我来说已经足够,所以也懒得去调优了。

CometServer Linux调优

首先我们需要调整最大打开文件数,在我的机器上面,nr_open最大的值为1048576,对我来说已经足够,所以我将最大文件描述符数量调整为1040000。

fs.file-max = 1040000

然后就是对tcp一些系统参数的调优:

net.core.somaxconn = 60000
net.core.rmem_default = 4096
net.core.wmem_default = 4096
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 4096 16777216
net.ipv4.tcp_wmem = 4096 4096 16777216
net.ipv4.tcp_mem = 786432 2097152 3145728
net.core.netdev_max_backlog = 60000
net.ipv4.tcp_fin_timeout = 15
net.ipv4.tcp_max_syn_backlog = 60000
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_max_orphans = 131072
net.ipv4.tcp_keepalive_time = 1200
net.ipv4.tcp_max_tw_buckets = 60000
net.netfilter.nf_conntrack_max = 1000000
net.netfilter.nf_conntrack_tcp_timeout_established = 1200

对于如何调节这些值,网上都是各有各的说法,建议直接man 7 tcp。我在实践中也会通过查看dmesg输出的tcp错误来动态调节。这里单独需要说明的是tcp buffer的设置,我最小和默认都是4k,这主要是考虑到推送服务器不需要太多太频繁的数据交互,所以需要尽可能的减少tcp的内存消耗。

测试结果

实际的测试比较让我满意。

cometserver test8个进程cpu消耗都比较低,因为有轮训timing wheel然后再发送200的逻辑,所以铁定有cpu消耗,如果只是挂载,cpu应该会更低。

PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                       
 4685 root      20   0  187m 171m  652 S 19.9  1.1   2:19.20 cometserver_test                                                                                                
 4691 root      20   0  191m 175m  656 S 16.6  1.1   2:17.80 cometserver_test                                                                                                
 4686 root      20   0  170m 155m  652 S 16.3  1.0   2:09.54 cometserver_test                                                                                                
 4690 root      20   0  183m 167m  652 S 16.3  1.1   2:11.44 cometserver_test                                                                                                
 4692 root      20   0  167m 152m  652 S 16.3  1.0   2:11.29 cometserver_test                                                                                                
 4689 root      20   0  167m 152m  652 S 15.3  1.0   2:03.08 cometserver_test                                                                                                
 4687 root      20   0  173m 158m  652 S 14.3  1.0   2:07.34 cometserver_test                                                                                                
 4688 root      20   0  129m 114m  652 S 12.3  0.7   1:35.77 cometserver_test

socket的统计情况:

[root@localhost ~]# cat /proc/net/sockstat
sockets: used 1017305
TCP: inuse 1017147 orphan 0 tw 0 alloc 1017167 mem 404824
UDP: inuse 0 mem 0
UDPLITE: inuse 0
RAW: inuse 0
FRAG: inuse 0 memory 0

可以看到,总共有1017147个tcp链接,同时占用了将近4G(1017167是页数,需要乘以4096)的内存。

系统内存的情况:

[root@localhost ~]# free
             total       used       free     shared    buffers     cached
Mem:      16334412   11210224    5124188          0     179424    1609300
-/+ buffers/cache:    9421500    6912912
Swap:      4194296          0    4194296

系统有16G内存,还有5G可用,所以不出意外单机应该还能承载更多的tcp连接。

总结

使用libtnet开发的一个简单的comet server支撑了百万级的连接,加深了我对其应用的信心。

libnet地址https://github.com/siddontang/libtnet,欢迎围观。

Libtnet HTTP实现

HTTP

libtnet提供了简单的http支持,使用也很简单。

一个简单的http server:

void onHandler(const HttpConnectionPtr_t& conn, const HttpRequest& request)
{
    HttpResponse resp;
    resp.statusCode = 200;
    resp.setContentType("text/html");
    resp.body.append("Hello World");    
    conn->send(resp);
}

TcpServer s;
HttpServer httpd(&s);
httpd.setHttpCallback("/test", std::bind(&onHandler, _1, _2));
httpd.listen(Address(80));
s.start(4);

我们对http server的“/test”注册了一个handler,当用户访问该url的时候,就会显示”Hello World”。

同样,http client的使用也很简单:

void onResponse(IOLoop* loop, const HttpResponse& resp)
{
    cout << resp.body << endl;
    loop->stop();
}

IOLoop loop;
HttpClientPtr_t client = std::make_shared<HttpClient>(&loop);
client->request("http://127.0.0.1:80/test", std::bind(&onResponse, &loop, _1));
loop.start(); 

这里,我们使用了一个http client,向server请求”/test”的内容,当服务器有响应之后,会调用响应的回调函数。

HTTP Parser

对于http的解析,我采用的是http parser,因为它采用的是流式解析,同时非常容易集成进libtnet。

使用http parser只需要设置相应的回调函数即可。http parser有如下几种回调:

  • message begin,解析开始的时候调用
  • url,解析url的时候调用
  • status complete,http response解析status的时候调用
  • header field,解析http header的field调用
  • header value,解析http header的value调用
  • headers complete,解析完成http header调用
  • body,解析http body调用
  • message complete,解析完成调用

这里特别需要注意的是http header的解析,因为http parser将其拆分成了两种回调,所以我们在处理的时候需要记录上一次header callback是field的还是value的。在解析field的时候,如果上一次是value callback,那我们就需要将上一次解析的field和value保存下来,而该次的解析则是一个新的field了。

另外,http parser还提供了upgrade的支持,所以我们很方便的就能区分该次请求是否为websocket。

Websocket

libtnet也提供了websocket的支持,现阶段,只支持RFC6455

当libtnet通过http parser发现该次请求为websocket的时候,就进入了websocket的流程。websocket的使用也很简单,当握手成功之后,后续的所有通讯就是纯粹的tcp通信了。

一个简单的websocket server:

void onWsCallback(const WsConnectionPtr_t& conn, WsEvent event, const void* context)
{
    switch(event)
    {
        case Ws_CloseEvent:
            break;
        case Ws_MessageEvent:
            {
                const string& str = *(const string*)context;
                conn->send(str);
            }
            break;
        case Ws_PongEvent:
            break;
        default:
            break;
    }
}

TcpServer s;
HttpServer httpd(&s);
httpd.setWsCallback("/push/ws", std::bind(&onWsCallback, _1, _2, _3));    
httpd.listen(Address(80));
s.start();

可以看到,websocket的callback机制也类似于libtnet connection的callback机制,用户需通过event + context的方式来处理该次回调的数据。

libtnet对websocket的frame的处理参照的是tornado的websocket模块。也能够组合多frame的数据,外部只需要关注Ws_MessageEvent即可。

websocket client的实现也很简单:

void onWsConnEvent(const WsConnectionPtr_t& conn, WsEvent event, const void* context)
{
    switch(event)
    {
        case Ws_OpenEvent:
            conn->send("Hello world");
            break;    
        case Ws_MessageEvent:
            {
                const string& msg = *(const string*)context;

                LOG_INFO("message %s", msg.c_str());
                conn->close();                        
            }
            break;
        default:
            break;
    }    
}

IOLoop loop;
WsClientPtr_t client = std::make_shared<WsClient>(&loop);    
client->connect("ws://127.0.0.1:80/push/ws", std::bind(&onWsConnEvent, _1, _2, _3));
loop.start();

Libtnet Connection实现

libtnet只支持IPv4 TCP Connection,之所以这么做都是为了使得实现尽可能的简单。我们主要在Connection类中封装了对tcp连接的操作。

Connection继承自std::enable_shared_from_this,也就意味着外部我们会操作其shared_ptr,libtnet几乎所有的对象都采用智能指针的方式来进行内存管理。

当Connection创建成功之后,会通过IOLoop的addHandler接口,将其绑定到ioloop上面:

ConnectionPtr_t conn = shared_from_this();
m_loop->addHandler(m_fd, TNET_READ, std::bind(&Connection::onHandler, conn, _1, _2));

因为我们直接在std::bind里面使用了shared_ptr,所以ioloop自然引用了该Connection,外部不需要在存储Connection,以防内存泄露。

对于一个connection而言,它只可能有几种状态,

  • Connecting,表明正在尝试连接,发生在connect返回EINPROGRESS。
  • Connected,连接已经建立成功,发生在connect成功或者accept成功。
  • Disconnecting,表明连接正在断开,发生在用户主动调用shutDown之后。
  • Disconnected,连接已经断开,这时候对应的socket也会被close掉。

Event Callback

在Connection中,我们使用一个event callback来绑定相应事件的回调。主要有如下connection event:

  • Conn_EstablishedEvent, 当server accept成功,创建了Connection对象之后,触发。
  • Conn_ConnectEvent,当client connect成功,触发。
  • Conn_ConnectingEvent,当client connect返回EINPROGRESS,触发。
  • Conn_ReadEvent,当连接可读,触发。
  • Conn_WriteCompleteEvent,当发送的数据都发送完毕,触发。
  • Conn_ErrorEvent,当连接有错误发生,触发。
  • Conn_CloseEvent,当连接主动或者被动关闭,触发。

event callback原型如下:

typedef shared_ptr<Connection> ConnectionPtr_t;
typedef std::function<void (const ConnectionPtr_t&, ConnEvent, const void* context)> ConnEventCallback_t;

对应不同的事件,触发的时候context的内容不同。现阶段,只有ReadEvent的时候context为StackBuffer,原型如下:

class StackBuffer
{
public:
    StackBuffer(const char* buf, size_t c) : buffer(buf), count(c) {}

    const char* buffer;
    size_t count;    
};

当连接可读的时候,Connection会将数据读取到栈上面,并用StackBuffer来指代,这样当外部处理ReadEvent的时候就能通过将context转换成StackBuffer获取到读取的数据。

下面简单说明一下一些设计上面的取舍:

  • 为什么只提供一个event callback,而不提供read callback,write complete callback,close callback多个回调接口?

    libtnet的所有callback都采用的是std::function实现,而该对象占用32字节,如果每个event都提供一个对应的callback,那么内存的开销会有点大,同时大部分时候很多callback我们是不感兴趣的。

    还有一个重要的原因在于只提供一个event callback,外部的一些对象就可以通过该callback跟Connection绑定,也就是将其自身的生命周期与Connection绑定在了一起,当Connection删除的时候该对象也自行删除。libtnet中,HttpConnection,WsConnection都是采用该方法,因为对于一个Http连接来说,如果底层的Tcp连接都断开无效了,基于Tcp的Http连接自然就无效了。

  • Connection为什么不缓存读取的数据,而是交由外部callback去处理?

    Connection作为一个底层的类,对于读取的数据,并不知道具体需要如何处理,所以还不如将数据直接发到外层,供上层实际的应用逻辑处理。但是如果后续Connection考虑支持ssl,那么就需要进行缓存数据了。

Write

Connection建立之后,默认只会在ioloop中设置TNET_READ事件,因为epoll采用的水平触发模式,如果直接设置TNET_WRITE事件,那么epoll会一直通知socket可写,但实际上并没有可以发送的数据。

所以,libtnet采用如下的方式进行数据发送:

  • 直接调用writev函数进行数据发送
  • 如果数据未发送完毕,则向ioloop注册TNET_WRITE事件,下次触发可写的时候继续发送,直至发送成功,清除TNET_WRITE事件。

另外,在发送的时候,我们还需要考虑signal pipe的情况,所以需要忽略该singal。使用如下方式:

class IgnoreSigPipe
{
public:
    IgnoreSigPipe()
    {
        signal(SIGPIPE, SIG_IGN);    
    }    
};

static IgnoreSigPipe initObj;

当libtnet启动的时候,就忽略了signal pipe信号。虽然这样做稍微有一点副作用,但大部分时候我们并不需要关注SIGPIPE信号。

Kick Off Connection

通常,为了处理不活跃连接,程序都会将每个connection设置一个timer,如果timer到了该连接仍然没有交互,则会删除该连接,否则则继续更新timer。另一种做法就是提供一个time wheel,将connection放置在该wheel中,如果有交互,则在wheel中移动。

libtnet采用了一种更简单,但是精度比较差的做法。

当server成功创建一个connection之后,将会添加到一个ConnChecker中,checker保存的是该connection的weak_ptr。每隔一段时间,checker检查一批connection:

  • 如果connection weak_ptr无法lock提升至shared_ptr,证明该连接已经删除,checker直接移除。
  • 如果connection处于connecting状态,并且超过了设置的最大连接超时时间,shutDown该connection。
  • 如果connection处于connected状态,并且在一段时间内没有任何交互,shutDown。

ConnChecker的检查间隔以及每次检查步数都可以通过外部设置。使用ConnChecker虽然简单,但是在连接数过大的情况下面,一些过期的connection不能立刻被清理掉。对于这个问题,我觉得可以接受,一个连接一秒之后被关闭还是两秒之后被关闭,差别真的不大。如果我们真的需要对一些connection做精确的时间控制,那直接可以对其使用timer。

Libtnet事件循环

libtnet采用的是prefork + event loop的架构方式,prefork就是server在启动的时候预先fork多个子进程同时工作,而event loop则是基于epoll的事件处理机制。

在最新的linux系统中,提供了timerfd,eventfd,signalfd,加上原先的socket,大部分功能都可以抽象成io事件来处理了。而在libtnet中,这一切的基础就是IOLoop。

类似于tornado,libtnet的IOLoop也提供了相似的接口。其中最核心的就是以下三个:

typedef std::function<void (IOLoop*, int)> IOHandler_t;

int addHandler(int fd, int events, const IOHandler_t& handler);
int updateHandler(int fd, int events);
int removeHandler(int fd);  

对于任意的IO,我们可以注册感兴趣的事件(TNET_READ和TNET_WRITE),并绑定一个对应的callback回调。

callback的回调采用的是std::function的方式,这也就意味着,你可以在外部通过std::bind绑定任意不同的实现,再加上shared_ptr技术模拟闭包。

假设现在我们需要创建了一个socket对象,并将其添加到IOLoop中,我们可以这么做:

std::shared_ptr<Connection> conn = std::make_shared<Connection>(socketfd);

ioloop->addHandler(socketfd, TNET_READ, std::bind(&Connection::onHandler, conn, _1, _2));

这样,当该socket有读事件发生的时候,对应的onHandler就会被调用。在这里,我是用了shared_ptr技术,主要是为了方便进行对象生命周期的管理。

在上面的例子中,因为std::bind的时候引用了conn,只要不将socketfd进行removeHandler,conn对象就会一直存在。所以libtnet在IOLoop内部,自行维护了conn对象的生命周期。外面不需要在将其保存到另一个地方(如果真保存了该shared_ptr的conn,反而会引起内存泄露)。在libtnet的基础模块中,我都使用的是weak_ptr来保存相关对象,每次使用都通过lock来判定是否该对象存活。

在IOLoop内部,我使用一个vector来存放注册的handler,vector的索引就是io的fd。这样,我们通过io的fd就可以非常快速的查找到对应的handler了。为什么可以这样设计,是因为在linux系统中,进程中新建文件的file descriptor都是系统当前最小的可用整数。譬如,我创建了一个socket,fd为10,然后我关闭了该socket,再次新建一个socket,这时候新的socket的fd仍然为最小可用的整数,也就是10。

EPoll

提到linux下面的高性能网络编程,epoll是一个铁定绕不开的话题,关于epoll的使用,网上有太多的讲解,这里就不展开了。

libtnet在Poller中集成了epoll,参考了libev的实现。epoll有两种工作模式,水平触发和边沿触发,各有利弊。libtnet使用的是水平触发方式,主要原因在于水平触发方式在有消息但是没处理的时候会一直通知你处理,实现起来不容易出错,也比较简单。

fork and epoll_create

这里顺便记录一下我在实现prefork模型的时候遇到的一个坑。这个问题就是epoll fd应该在fork之前还是之后创建?

大家都知道,linux fork的时候采用COW(copy on write)方式复制父进程的内容,然后我想当然的以为各个子进程会拥有独立的epoll内核空间,于是在fork之前创建了epoll fd。但是后面我却惊奇的发现一个子进程对epoll的操作竟然会影响另一个子进程。也就是说,各个子进程共享了父进程的epoll内核空间。

所以,epoll fd的创建应该在fork之后,各个子进程独立创建。

Example

Timer

IOLoop提供了一个简单的runAfter函数,用以实现定时器功能,使用非常简单:

void func(IOLoop* loop)
{
    cout << "hello world" << endl;
    loop->stop();
}

IOLoop loop;
loop.runAfter(10 * 1000, std::bind(&func, &loop));
loop.start();

loop启动十秒之后,会打印hello world,然后整个loop退出。更多定制化的timer使用,可以使用libtnet提供的Timer class。

Callback

libtnet是一个单线程单ioloop的模型,但是不排除仍然会有其他线程希望与IOLoop进行通信,所以IOLoop提供了addCallback功能,这是libtnet唯一一个线程安全的函数。因为加入callback是一个很快速的操作,IOLoop使用了spinlock。在IOLoop每次循环的末尾,会将全部的callback取出,依次执行。

void callback(IOLoop* loop)
{
    cout << "tell to exit" << endl;
    loop->stop();
}

IOLoop loop;
loop.addCallback(std::bind(&func, &loop));
loop.start();

高性能网络库Libtnet介绍

libtnet是一个用c++编写的高性能网络库,它在设计上面主要参考tornado,为服务端网络编程提供简洁而高效的接口,非常易于使用。

Echo Server

void onConnEvent(const ConnectionPtr_t& conn, ConnEvent event, const void* context)
{
    switch(event)
    {
        case Conn_ReadEvent:
            {
                const StackBuffer* buffer = static_cast<const StackBuffer*>(context);
                conn->send(string(buffer->buffer, buffer->count));
            }
            break;
        default:
            break;
    }    
}

int main()
{
    TcpServer s;
    s.listen(Address(11181), std::bind(&onConnEvent, _1, _2, _3));

    s.start();

    return 0;
}

当程序启动,服务监听本地11181端口,我们使用telnet测试:

root@tnet:~# telnet 127.0.0.1 11181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hello world
hello world

可以看到,libtnet在使用上面非常简单,在listen的时候,指定一个回调函数,当有新的连接到来的时候,该回调函数就会与该connection进行绑定,这样该connection的任何事件都能通过回调进行处理。

在上面那个例子中,我们只关心了connection的ReadEvent,也就是读事件,然后将读取到的所有数据原封不动的转发回去。

Http Server

void onHandler(const HttpConnectionPtr_t& conn, const HttpRequest& request)
{
    HttpResponse resp;
    resp.statusCode = 200;
    resp.body.append("Hello World");

    conn->send(resp);
}

int main()
{
    TcpServer s;
    HttpServer httpd(&s);   
    httpd.setHttpCallback("/abc", std::bind(&onHandler, _1, _2));

    httpd.listen(Address(11181));    
    s.start(4);

    return 0;
} 

当server启动,程序使用本机11181端口提供http服务。我们使用curl测试。

curl http://127.0.0.1:11181/abc

return: hello world

可以看到,使用http server也非常简单,我们只需要对相应的路径绑定一个callback回调,当有请求发生的时候,对应的callback执行。

使用benchmark测试,发现性能也不错RPS能到16000+,在512MB,单核CPU下面进行ab压测,具体可以参考benchmark

Webscoket Server

void onWsCallback(const WsConnectionPtr_t& conn, WsEvent event, const void* context)
{
    switch(event)
    {
        case Ws_MessageEvent:
            {
                const string& str = *(const string*)context;
                conn->send("hello " + str);
            }
            break;
        default:
            break;
    }
}

int main()
{
    TcpServer s;

    HttpServer httpd(&s);

    httpd.setWsCallback("/push/ws", std::bind(&onWsCallback, _1, _2, _3));    

    httpd.listen(Address(11181));

    s.start();

    return 0; 
}

libtnet同样提供了websocket RFC6455的支持,使用方法同http server,只需要对相应的path注册特定的回调,就可以很方便的进行websocket交互。

Client

libtnet不光提供了server层面的相关功能,同时也集成了http clientwebsocket client以及redis client。使得libtnet也能方便的进行客户端网络功能的开发。对于具体的使用,可以参考example

设计上面的考量

libtnet只支持linux版本,虽然做一个跨平台的通用库是一件吸引力非常大的事情,但是综合考虑之后,我决定只做linux版本的,主要有以下几个原因:

  • Linux下面使用prefork + epoll是一种非常高效的网络编程模型,性能强悍,实现简单。虽然unix下面有kqueue,windows下面有IOCP,但是没必要为了适配所有得操作系统将代码写的复杂。
  • Linux在系统层面上面就提供了很多高性能的函数,譬如timerfd,eventfd等,不光性能提升,同时也简化了很多代码实现。
  • Linux在服务器编程领域的使用率很高,专门做精一个平台就够了。

因为高性能的网络编程通常都是使用异步的编程方式,所以经常可以看到代码被异步拆的特别分散,不利于编写。所以我在libtnet里面大量的使用了c++ bind以及shared_ptr技术,用来模拟函数闭包,以及解决对象生命周期管理问题,简化代码的编写。并且我也使用了c++ 0x相关技术,gcc的版本至少要在4.4以上。

对于如何设计以及使用libtnet,后续我会有更加详细的说明。