123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- package redis
- import (
- "container/list"
- "errors"
- "log"
- "net"
- "sync"
- "time"
- "gopkg.in/bufio.v1"
- )
- var (
- errClosed = errors.New("redis: client is closed")
- errRateLimited = errors.New("redis: you open connections too fast")
- )
- var (
- zeroTime = time.Time{}
- )
- type pool interface {
- Get() (*conn, bool, error)
- Put(*conn) error
- Remove(*conn) error
- Len() int
- Size() int
- Close() error
- Filter(func(*conn) bool)
- }
- //------------------------------------------------------------------------------
- type conn struct {
- netcn net.Conn
- rd *bufio.Reader
- buf []byte
- inUse bool
- usedAt time.Time
- readTimeout time.Duration
- writeTimeout time.Duration
- elem *list.Element
- }
- func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
- return func() (*conn, error) {
- netcn, err := dial()
- if err != nil {
- return nil, err
- }
- cn := &conn{
- netcn: netcn,
- buf: make([]byte, 0, 64),
- }
- cn.rd = bufio.NewReader(cn)
- return cn, nil
- }
- }
- func (cn *conn) Read(b []byte) (int, error) {
- if cn.readTimeout != 0 {
- cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))
- } else {
- cn.netcn.SetReadDeadline(zeroTime)
- }
- return cn.netcn.Read(b)
- }
- func (cn *conn) Write(b []byte) (int, error) {
- if cn.writeTimeout != 0 {
- cn.netcn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))
- } else {
- cn.netcn.SetWriteDeadline(zeroTime)
- }
- return cn.netcn.Write(b)
- }
- func (cn *conn) RemoteAddr() net.Addr {
- return cn.netcn.RemoteAddr()
- }
- func (cn *conn) Close() error {
- return cn.netcn.Close()
- }
- //------------------------------------------------------------------------------
- type connPool struct {
- dial func() (*conn, error)
- rl *rateLimiter
- opt *options
- cond *sync.Cond
- conns *list.List
- idleNum int
- closed bool
- }
- func newConnPool(dial func() (*conn, error), opt *options) *connPool {
- return &connPool{
- dial: dial,
- rl: newRateLimiter(time.Second, 2*opt.PoolSize),
- opt: opt,
- cond: sync.NewCond(&sync.Mutex{}),
- conns: list.New(),
- }
- }
- func (p *connPool) new() (*conn, error) {
- if !p.rl.Check() {
- return nil, errRateLimited
- }
- return p.dial()
- }
- func (p *connPool) Get() (*conn, bool, error) {
- p.cond.L.Lock()
- if p.closed {
- p.cond.L.Unlock()
- return nil, false, errClosed
- }
- if p.opt.IdleTimeout > 0 {
- for el := p.conns.Front(); el != nil; el = el.Next() {
- cn := el.Value.(*conn)
- if cn.inUse {
- break
- }
- if time.Since(cn.usedAt) > p.opt.IdleTimeout {
- if err := p.remove(cn); err != nil {
- log.Printf("remove failed: %s", err)
- }
- }
- }
- }
- for p.conns.Len() >= p.opt.PoolSize && p.idleNum == 0 {
- p.cond.Wait()
- }
- if p.idleNum > 0 {
- elem := p.conns.Front()
- cn := elem.Value.(*conn)
- if cn.inUse {
- panic("pool: precondition failed")
- }
- cn.inUse = true
- p.conns.MoveToBack(elem)
- p.idleNum--
- p.cond.L.Unlock()
- return cn, false, nil
- }
- if p.conns.Len() < p.opt.PoolSize {
- cn, err := p.new()
- if err != nil {
- p.cond.L.Unlock()
- return nil, false, err
- }
- cn.inUse = true
- cn.elem = p.conns.PushBack(cn)
- p.cond.L.Unlock()
- return cn, true, nil
- }
- panic("not reached")
- }
- func (p *connPool) Put(cn *conn) error {
- if cn.rd.Buffered() != 0 {
- b, _ := cn.rd.ReadN(cn.rd.Buffered())
- log.Printf("redis: connection has unread data: %q", b)
- return p.Remove(cn)
- }
- if p.opt.IdleTimeout > 0 {
- cn.usedAt = time.Now()
- }
- p.cond.L.Lock()
- if p.closed {
- p.cond.L.Unlock()
- return errClosed
- }
- cn.inUse = false
- p.conns.MoveToFront(cn.elem)
- p.idleNum++
- p.cond.Signal()
- p.cond.L.Unlock()
- return nil
- }
- func (p *connPool) Remove(cn *conn) error {
- p.cond.L.Lock()
- if p.closed {
- // Noop, connection is already closed.
- p.cond.L.Unlock()
- return nil
- }
- err := p.remove(cn)
- p.cond.Signal()
- p.cond.L.Unlock()
- return err
- }
- func (p *connPool) remove(cn *conn) error {
- p.conns.Remove(cn.elem)
- cn.elem = nil
- if !cn.inUse {
- p.idleNum--
- }
- return cn.Close()
- }
- // Len returns number of idle connections.
- func (p *connPool) Len() int {
- defer p.cond.L.Unlock()
- p.cond.L.Lock()
- return p.idleNum
- }
- // Size returns number of connections in the pool.
- func (p *connPool) Size() int {
- defer p.cond.L.Unlock()
- p.cond.L.Lock()
- return p.conns.Len()
- }
- func (p *connPool) Filter(f func(*conn) bool) {
- p.cond.L.Lock()
- for el, next := p.conns.Front(), p.conns.Front(); el != nil; el = next {
- next = el.Next()
- cn := el.Value.(*conn)
- if !f(cn) {
- p.remove(cn)
- }
- }
- p.cond.L.Unlock()
- }
- func (p *connPool) Close() error {
- defer p.cond.L.Unlock()
- p.cond.L.Lock()
- if p.closed {
- return nil
- }
- p.closed = true
- p.rl.Close()
- var retErr error
- for {
- e := p.conns.Front()
- if e == nil {
- break
- }
- if err := p.remove(e.Value.(*conn)); err != nil {
- log.Printf("cn.Close failed: %s", err)
- retErr = err
- }
- }
- return retErr
- }
- //------------------------------------------------------------------------------
- type singleConnPool struct {
- pool pool
- cnMtx sync.Mutex
- cn *conn
- reusable bool
- closed bool
- }
- func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
- return &singleConnPool{
- pool: pool,
- reusable: reusable,
- }
- }
- func (p *singleConnPool) SetConn(cn *conn) {
- p.cnMtx.Lock()
- p.cn = cn
- p.cnMtx.Unlock()
- }
- func (p *singleConnPool) Get() (*conn, bool, error) {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.closed {
- return nil, false, errClosed
- }
- if p.cn != nil {
- return p.cn, false, nil
- }
- cn, isNew, err := p.pool.Get()
- if err != nil {
- return nil, false, err
- }
- p.cn = cn
- return p.cn, isNew, nil
- }
- func (p *singleConnPool) Put(cn *conn) error {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn != cn {
- panic("p.cn != cn")
- }
- if p.closed {
- return errClosed
- }
- return nil
- }
- func (p *singleConnPool) put() error {
- err := p.pool.Put(p.cn)
- p.cn = nil
- return err
- }
- func (p *singleConnPool) Remove(cn *conn) error {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn == nil {
- panic("p.cn == nil")
- }
- if p.cn != cn {
- panic("p.cn != cn")
- }
- if p.closed {
- return errClosed
- }
- return p.remove()
- }
- func (p *singleConnPool) remove() error {
- err := p.pool.Remove(p.cn)
- p.cn = nil
- return err
- }
- func (p *singleConnPool) Len() int {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn == nil {
- return 0
- }
- return 1
- }
- func (p *singleConnPool) Size() int {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn == nil {
- return 0
- }
- return 1
- }
- func (p *singleConnPool) Filter(f func(*conn) bool) {
- p.cnMtx.Lock()
- if p.cn != nil {
- if !f(p.cn) {
- p.remove()
- }
- }
- p.cnMtx.Unlock()
- }
- func (p *singleConnPool) Close() error {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.closed {
- return nil
- }
- p.closed = true
- var err error
- if p.cn != nil {
- if p.reusable {
- err = p.put()
- } else {
- err = p.remove()
- }
- }
- return err
- }
|