一、并发问题

redigo 官方的文档描述中,Receive() 方法是不支持多并发的,原文为:

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

Do() 方法是间接调用了Receive() 方法,所以Do() 方法也是不支持多并发的。我们可以用一段代码来验证这一点:

func incr(i int) {
    fmt.Println("Start thread", i)
    err := gCoon.Send("incrby", "nKey", fmt.Sprintf("%d", i))
    checkErr(err)
    err = gCoon.Flush()
    checkErr(err)
    time.Sleep(time.Second * time.Duration(i))
    rs, err := redis.Int(gCoon.Receive())
    checkErr(err)
    fmt.Printf("Thread %d Incr success, result is %d", i, rs)
}

这里是一个函数,完成了一个简单的INCRBY 命令,实现nKey + i 功能,和正常情况不同的是在Send()Flush() 后不会立刻使用Receive() 来获取结果,而是让线程休眠一段时间再来获取。

主函数中开启两个gorutine运行这段代码:

go incr(5)
go incr(1)

运行结果:

Start thread 5
Start thread 1
Thread 1 Incr success, result is 5
Thread 5 Incr success, result is 6

可以看到,线程5 先运行,然后线程1 运行,由于线程1 休眠时间短,所以它会先读取输入缓冲区的返回数据,按照预期,线程1 读到的结果应该是1 ,因为它只是执行了incr nKey 1。 而实际上,它读到的却是线程5 的结果。

从这里我们可以很明显看出这一过程是线程不安全的,即它是不支持多并发的。那么如果要实现并发应该怎么做呢,官方也提出了解决方法,使用线程安全的连接池 来解决这个问题:

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

二、连接池

redigo 中的线程池是一个对象:

type Pool struct {
    Dial func() (Conn, error) // 连接redis的函数
    TestOnBorrow func(c Conn, t time.Time) error  // 测试空闲线程是否正常运行的函数
    MaxIdle int  //最大的空闲连接数,可以看作是最大连接数
    MaxActive int  //最大的活跃连接数,默认为0不限制
    IdleTimeout time.Duration  //连接超时时间,默认为0表示不做超时限制
    Wait bool  //当连接超出数量先之后,是否等待到空闲连接释放
}

一个连接池创建的示例为:

func newPool(addr string) *redis.Pool {
  return &redis.Pool{
    MaxIdle: 3,
    IdleTimeout: 240 * time.Second,
    Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
  }
}
var (
  pool *redis.Pool
  redisServer = flag.String("redisServer", ":6379", "")
)
func main() {
  flag.Parse()
  pool = newPool(*redisServer)
  ...
}

当连接池创建完毕之后,如果需要使用连接时调用pool.Get() 方法即可获得一个可用的连接,此时再执行Do() 等方法时就不会被其他并发干扰,要注意的是每获取到一个可用的连接并且在使用完之后,一定要通过conn.Close() 来主动释放连接,以方便下一个应用调用,不然该连接将会一直被占用。

同时官方提供了一个NewPool() 来创建连接池,但是这个方法是不推荐的:

// NewPool creates a new pool.
//
// Deprecated: Initialize the Pool directory as shown in the example.
func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
    return &Pool{Dial: newFn, MaxIdle: maxIdle}
}

三、示例代码

一个完整的示例代码:

package main
import (
    "github.com/garyburd/redigo/redis"
    "fmt"
)
//连接池的连接到服务的函数
func newPoolFunc()(redis.Conn, error){
    return redis.Dial("tcp", ":6379")
}
//生成一个连接池对象
func newPool()(* redis.Pool){
    return &redis.Pool{
        MaxIdle: 10,
        Dial: newPoolFunc,
        Wait: true,
    }
}
//错误检测
func checkErr(err error){
    if err != nil{
        panic(err)
    }
}
func main(){
    pool := newPool()
    conn := pool.Get()
    defer conn.Close()
    rs, err := redis.Strings(conn.Do("KEYS", "*"))
    checkErr(err)
    fmt.Println(rs)  // [nKey]
    fmt.Println(pool.ActiveCount())  // 1
    conn.Close()
}
最后修改:2017 年 11 月 29 日
如果觉得我的文章对你有用,请随意赞赏