It event poll

小小网站,记录你的点滴知识!

http://www.zhaozhanfeng.com/blog_57/


上一次用Go第三方类库实现了Redis的连接池,爽到内伤,既然要做一个高并发框架,那Mysql的连接池也必不可少,可惜找了无数文档,就是没有找到关于Mysql连接池的实现方式,百度到的也都是假的,怎么个假法,咱贴代码看看吧。

var MySQLPool chan *mysql.MySQL
func getMySQL() *mysql.MySQL {
    if MySQLPool == nil {
        MySQLPool = make(chan *mysql.MySQL, MAX_POOL_SIZE)
    }
    if len(MySQLPool) == 0 {
        go func() {
            for i := 0; i < MAX_POOL_SIZE/2; i++ {
                mysql := mysql.New()
                err := mysql.Connect("127.0.0.1", "root", "", "wgt", 3306)
                if err != nil {
                    panic(err.String())
                }   
                putMySQL(mysql)
            }   
        }() 
    }   
    return <-MySQLPool
}
func putMySQL(conn *mysql.MySQL) {
    if MySQLPool == nil {
        MySQLPool = make(chan *mysql.MySQL, MAX_POOL_SIZE)
    }   
    if len(MySQLPool) == MAX_POOL_SIZE {
        conn.Close()
        return
    }
    MySQLPool <- conn
}

这个就是百度搜索 “golang mysql 连接池” 得到最多的实现代码,这个代码真是害人无数啊,首先我们看看连接池解决的是什么问题,连接池解决的是TCP连接问题,3次握手,还真别小看这三次握手,他可以让http请求直降10倍性能。下面我们看看连接池是怎么解决这个TCP连接问题的吧。

1、初始化的时候,生成一批默认连接,例如:100

2、当用户访问的时候,从连接池里面取出一个连接,判断这个连接如果没有连接就连接,有连接就直接返回

3、用户使用连接操作数据库,完了返回给连接池,以便其他用户可以直接使用,不需要做二次连接,只有当这个连接失效的时候才要从新连接,所以这个连接应该具备重连功能

基于以上3点做出的连接池,即可解决连接消耗问题,那么上面的代码为什么说它不行?看看他的实现:

1、利用Go的channel生成一个队列来充当连接池,这个池是全局的,以便其他goroutine可以共用

var MySQLPool chan *mysql.MySQL

2、开启一个goroutine来生成一堆连接,并且放到MySQLPool channel里面

3、每次调用putMySQL方法从MySQLPool channel里面取出一个连接进行操作

4、channel里面的连接用完之后又生成一批连接

记住了,他是又生成一批,不是每次用完放回池里,重复利用,所以他这个是一点也没有解决TCP连接损耗问题,这是一个错误的释放,真想砍死这个垃圾作者,不过估计也很难找到本人了,百度上面一堆都是这个代码,抄来抄去的,算了,也想多说了,下面自己来实现一个

废话少说,先上代码:

package epooll
import (
    "time"
    //"fmt"
    //"reflect"
)
var nowFunc = time.Now // for testing
type ConnPool struct {
    // Dial is an application supplied function for creating and configuring a
    // connection
    Dial func() (interface{}, error)
    //Dial func() (*autorc.Conn, error)
    // Maximum number of idle connections in the pool.
    MaxIdle int
    // Maximum number of connections allocated by the pool at a given time.
    // When zero, there is no limit on the number of connections in the pool.
    MaxActive int
    closed bool
    active int
    idle chan interface{}
}
type idleConn struct {
    c interface{}
    t time.Time
}
// 批量生成连接,并把连接放到连接池channel里面
func (this *ConnPool)InitPool() error{
    this.idle = make(chan interface{}, this.MaxActive)
    for x := 0; x < this.MaxActive; x++ {
        //conn, err := this.Dial()
        // 这里返回DB类,而不是返回mysql.Conn,否则DB类的insert,update 这些Active Record类方法没法使用
        db, err := this.Dial()
        //fmt.Println(" --- reflect --- ", reflect.TypeOf(db))
        if err != nil {
            return err
        }
        //this.idle <-conn
        this.idle <-idleConn{t: nowFunc(), c: db}
    }
    return nil
}
// 从连接池里取出连接
func (this *ConnPool)Get() interface{} {
    // 如果空闲连接为空,初始化连接池
    if this.idle == nil {
        this.InitPool()
    }
    // 赋值一下好给下面回收和返回
    //conn := <-this.idle
    //idleConn
    ic := <-this.idle
    // 这里要用 (idleConn) 把interface{} 类型转化为 idleConn 类型的,否则拿不到里面的属性t、c
    conn := ic.(idleConn).c
    //fmt.Println(conn.(*DB).conn)
    //fmt.Println(" --- reflect --- ", reflect.TypeOf(conn))
    // 使用完把连接回收到连接池里
    defer this.Release(conn)
    // 因为channel是有锁的,所以就没必要借助sync.Mutex来进行读写锁定
    // container/list就需要锁住,不然并发就互抢出问题了
    return conn
}
// 回收连接到连接池
func (this *ConnPool)Release(conn interface{}) {
    //this.idle <-conn
    this.idle <-idleConn{t: nowFunc(), c: conn}
}

这里用interface而不是直接用*autorc.Conn是为了方便以后memcache连接池的时候可以直接使用,换句话说,只要是TCP连接的应用,都能使用这个池,这才是一个真正的连接池,因为功能还没实现完善,我就不多说了,自己看代码,下面是初始化连接池方法:

func newMysqlPool() *ConnPool {
    conf := InitConfig()
    poolNum, _ := strconv.Atoi(conf.GetValue("pool", "mysql")) 
    fmt.Printf("初始化 Mysql 连接池,连接数:%d \n", poolNum)
    return &ConnPool{
        MaxActive: poolNum,
        //Dial: func() (*autorc.Conn, error) {
        Dial: func() (interface{}, error) {
            conf := InitConfig()
            host := conf.GetValue("db", "host")
            port := conf.GetValue("db", "port")
            user := conf.GetValue("db", "user")
            pass := conf.GetValue("db", "pass")
            name := conf.GetValue("db", "name")

            //conn := autorc.New("tcp", "", "localhost:3306", "root", "root", "test")
            //conn.Register("set names utf8")
            db, err := InitDB(host+":"+port, user, pass, name)
            return db, err
        },
    } 
}
var MysqlConn = newMysqlPool()

因为代码太多就不都贴出来了,如果想要一个完整的项目,请到我的MVC框架去看,下一节是教大家如何使用这个所谓的高并发MVC框架

框架地址:https://github.com/owner888/epooll


  1. 检查每个case语句
  2. 如果有任意一个chan是send or recv read,那么就执行该block
  3. 如果多个case是ready的,那么随机找1个并执行该block
  4. 如果都没有ready,那么就block and wait
  5. 如果有default block,而且其他的case都没有ready,就执行该default block

最近被日志是折腾得死去活来,写文件无疑效率是最高的,但是分布式又成问题,虽然稍微折腾一下配合NFS,还是可以搞一搞的,但是始终语言设计没有那么方便。

最终决定用redis,换了redis以为就好了,因为内存运行嘛,谁知道tcp连接开销大得一塌糊涂,服务器负载一下子高了许多,使用netstat -an 查看发现一堆的 TIME_WAIT,连ssh到服务器都巨慢无比,所谓天下武功唯快不破,这么慢80岁老太太跳一支广场舞都能给灭了吧。

既然 tcp连接开销这么大,当然首要任务就是解决连接问题,明显一个请求一次连接是很不靠谱的,还不如直接往硬盘写日志呢,当然写日志第一段也说了,不支持分布式,业务分配没那么好。

那么,能不能先定只用一个连接呢,这显然是不行的,一个连接多个php-fpm互掐也会造成瓶颈,那如果是一个php-fpm一个连接呢?显然这是可取的,于是用了php的redis长连接,php-fpm.ini 配置如下: 

pm = static 
pm.max_children = 400 
pm.max_requests = 10240

php使用 pconnect 来代替 connect

$redis = new redis();
$redis->pconnect('192.168.0.2', 6379); // 内网服务器
$redis->lpush('list', 'Just a test');

上面的配合意思是默认开启400个php-fpm来处理nginx反向代理过来的请求,一个php-fpm处理10240个请求,也就是说,处理10240个请求只需要连接redis一次,显然对于一个请求连接一次redis的开销要小N倍,压测效果也如上所述,从原本的每秒处理几千次请求一下子涨到了1W多次请求。

压测指令也贴上吧,其实还是有许多人不知道的

ab -n 100000 -c 200 要压测的url  // 并发200个客户端请求100000次要压测的url

当然要验证nginx是否真的只开了400个php-fpm,可以用以下代码验证

$pid = getmypid();
touch("pids/".$pid);

执行上面的压测指令,看看pids目录下是不是生成了400个以当前php-fpm进程号为名称的文件,当然不是刚好400个,因为还有一两个是manager进程嘛,呵呵。

压测效果很理想,那是不是问题就解决了呢,当然,PHP没你想的那么美好,举个例子,Mysql的长连接默认是8小时,redis没有了解过,我们就当他也8小时,那8小时一个php-fpm处理10240个请求肯定早就处理完啦,那这个长连接还不关闭,又不能重用,随着时间的推移,连接数不是只增不减,总有一天会内存溢出?

有人可能会说,我测一下10240个请求需要多长时间,redis长连接的超时时间设置接近的数字就好,问题是网络环境哪有想象那么美好,假如网络不好造成10240个请求还没处理完连接超时了呢?加入网络太好处理了好几轮10240次请求连接越来越多了呢?

不过redis好就好在他是单进程单线程IO多路复用的,所以连接一直不关闭也不会有什么大的影响,Mysql是多线程的,线程开多了不关,问题肯定还是比较严重的。

那有没有办法可以弄一个来管理这些长连接的,让他一直不要关闭,用完就给另一个新开的php-fpm,答案就是连接池。

网上照抄一下原理:

    连接池基本的思想是在系统初始化的时候,将数据库连接作为对象存储在内存中,当用户需要访问数据库时,并非建立一个新的连接,而是从连接池中取出一个已建立的空闲连接对象。使用完毕后,用户也并非将连接关闭,而是将连接放回连接池中,以供下一个请求访问使用。而连接的建立、断开都由连接池自身来管理。同时,还可以通过设置连接池的参数来控制连接池中的初始连接数、连接的上下限数以及每个连接的最大使用次数、最大空闲时间等等。也可以通过其自身的管理机制来监视数据库连接的数量、使用情况等。

按照上面的思想设计一个连接池显然对PHP是莫大的挑战,因为php-fpm之间内存是不共享的,于是我选择了Go语言来干这事,原因很简单,Go的写法和PHP一样简单,java太复杂配置环境也很麻烦,最后被我抛弃了,也不能说抛弃吧,太菜了用不来。

Go连接redis有很多库,最终选择了github.com/garyburd/redigo/redis,然后参考网上的文章写了一个连接池的例子:

package main
import (
    "net/http"
    "runtime"
    "io"
    "fmt"
    "log"
    "time"
    "github.com/garyburd/redigo/redis"
)
// 连接池大小
var MAX_POOL_SIZE = 20
var redisPoll chan redis.Conn
func putRedis(conn redis.Conn) {
    // 基于函数和接口间互不信任原则,这里再判断一次,养成这个好习惯哦
    if redisPoll == nil {
        redisPoll = make(chan redis.Conn, MAX_POOL_SIZE)
    }
    if len(redisPoll) >= MAX_POOL_SIZE {
        conn.Close()
        return
    }
    redisPoll <- conn
}
func InitRedis(network, address string) redis.Conn {
    // 缓冲机制,相当于消息队列
    if len(redisPoll) == 0 {
        // 如果长度为0,就定义一个redis.Conn类型长度为MAX_POOL_SIZE的channel
        redisPoll = make(chan redis.Conn, MAX_POOL_SIZE)
        go func() {
            for i := 0; i < MAX_POOL_SIZE/2; i++ {
                c, err := redis.Dial(network, address)
                if err != nil {
                    panic(err)
                }
                putRedis(c)
            }
        } ()
    }
    return <-redisPoll
}
func redisServer(w http.ResponseWriter, r *http.Request) {
    startTime := time.Now()
    c := InitRedis("tcp", "192.168.0.237:6379")
    dbkey := "netgame:info"
    if ok, err := redis.Bool(c.Do("LPUSH", dbkey, "yanetao")); ok {
    } else {
        log.Print(err)
    }
    msg := fmt.Sprintf("用时:%s", time.Now().Sub(startTime));
    io.WriteString(w, msg+"\n\n");
}
func main() {
    // 利用cpu多核来处理http请求,这个没有用go默认就是单核处理http的,这个压测过了,请一定要相信我
    runtime.GOMAXPROCS(runtime.NumCPU());
    http.HandleFunc("/", redisServer);
    http.ListenAndServe(":9527", nil);
}

上面看似实现了连接池,实际压测效果很不理想,不但请求数低,还报错。

请求数底是因为没有解决连接开销,上面是第一次来的时候开一个go协程去连接20/2就是10次redis,然后放到channel里面去,实际上就是队列了,channel就是消息队列,然后当这10个连接被请求用完了,就又生成10个,实际上tcp连接数一个没少,多少个请求就多少个连接数,只是把连接时间片给移了一下,移给刚好10次连接过后那个倒霉蛋。

错误是因为我内网测试,请求太快了,第一波10个连接还没连接好,第二波又来了,没错,又一大波僵尸,然后几波的goroutine就开始互掐,导致连接错误了

还是没有达到连接池的效果。

连接池的概念是先生成默认的连接,例如40个,那么所有请求过来,都是用这40个连接来处理。

当40个连接被用完的时候,要么排队,要么自增多几个来处理。

这40个连接和新生成的连接,假如生成多5个,那么这45个连接是不会关闭的,用完就放回池里,其实也就是队列里,等待其他请求来使用他,达到连接复用的效果。

也就是说这些连接一定是长连接,一直连着不断开。

上面明显是短连接,我试过放到全局变量,每个连接处理六七个请求之后,就断开了,程序提示连接不可用,go要如何达到长连接的效果呢,最后在老外的文章找到了这个库的实现方式,代码如下:

package main
import (
    "net/http"
    "runtime"
    "io"
    "fmt"
    "log"
    "time"
    "github.com/garyburd/redigo/redis"
)
// 重写生成连接池方法
func newPool() *redis.Pool {
    return &redis.Pool{
        MaxIdle: 80,
        MaxActive: 12000, // max number of connections
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", "192.168.0.2:6379")
            if err != nil {
                panic(err.Error())
            }
            return c, err
        },
    }
}
// 生成连接池
var pool = newPool()
func redisServer(w http.ResponseWriter, r *http.Request) {
    startTime := time.Now()
    // 从连接池里面获得一个连接
    c := pool.Get()
    // 连接完关闭,其实没有关闭,是放回池里,也就是队列里面,等待下一个重用
    defer c.Close()
    dbkey := "netgame:info"
    if ok, err := redis.Bool(c.Do("LPUSH", dbkey, "yangzetao")); ok {
    } else {
        log.Print(err)
    }
    msg := fmt.Sprintf("用时:%s", time.Now().Sub(startTime));
    io.WriteString(w, msg+"\n\n");
}
func main() {
    runtime.GOMAXPROCS(runtime.NumCPU());
    http.HandleFunc("/", redisServer);
    http.ListenAndServe(":9527", nil);
}

压测了一下,上面的代码往redis里面插数据跟只是输出字符串HelloWorld到客户端几乎一样快,检查了一下redis里面netgame:info这个队列数据,一条数据都没丢,这才是我真正要的效果啊,好吧,一个牛逼哄哄的go+redis日志系统真的要诞生了,哇哈哈。

参考:

一个老外问把连接放到全局变量搞不定,当然搞不定,短连接会超时的嘛

官方的实现源码

连接池概念


package main

import (
    "fmt"
)

// 声明数据结构
// 大写变量、方法为公有变量和方法。小写为私有。
type User struct {
    Name    string
    Age     uint
    Address string
    Sex     bool
    Money   float64
}

// 为数据结构添加方法
func (u User) Display() string {
    var sex string
    if u.Sex {
        sex = "boy"
    } else {
        sex = "gril"
    }
    return fmt.Sprintf("Name : %s\nAge : %d\nAddress : %s\nSex : %s\nMoney : %.2f", u.Name, u.Age, u.Address, sex, u.Money)
}

func main() {
    // 使用方式1
    var u1 User
    u1.Name = "user1"
    u1.Money = 10245.236
    fmt.Println(u1.Display())
    fmt.Println("===============================")
    // 使用方式2
    u2 := new(User)
    u2.Name = "小明"
    u2.Money = 235423.2
    fmt.Println(u2.Display())
    fmt.Println("===============================")
    // 使用方式3
    u3 := User{"小花", 23, "上海", false, 23134.234}
    fmt.Println(u3.Display())
    fmt.Println("===============================")
    // 使用方式4
    u4 := User{Name: "小李", Money: 235435.3434, Sex: true}
    fmt.Println(u4.Display())
}