redis.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package redis
  2. import (
  3. "log"
  4. "net"
  5. "time"
  6. )
  7. type baseClient struct {
  8. connPool pool
  9. opt *options
  10. cmds []Cmder
  11. }
  12. func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error {
  13. buf := cn.buf[:0]
  14. for _, cmd := range cmds {
  15. buf = appendArgs(buf, cmd.args())
  16. }
  17. _, err := cn.Write(buf)
  18. return err
  19. }
  20. func (c *baseClient) conn() (*conn, error) {
  21. cn, isNew, err := c.connPool.Get()
  22. if err != nil {
  23. return nil, err
  24. }
  25. if isNew {
  26. if err := c.initConn(cn); err != nil {
  27. c.removeConn(cn)
  28. return nil, err
  29. }
  30. }
  31. return cn, nil
  32. }
  33. func (c *baseClient) initConn(cn *conn) error {
  34. if c.opt.Password == "" && c.opt.DB == 0 {
  35. return nil
  36. }
  37. pool := newSingleConnPool(c.connPool, false)
  38. pool.SetConn(cn)
  39. // Client is not closed because we want to reuse underlying connection.
  40. client := &Client{
  41. baseClient: &baseClient{
  42. opt: c.opt,
  43. connPool: pool,
  44. },
  45. }
  46. if c.opt.Password != "" {
  47. if err := client.Auth(c.opt.Password).Err(); err != nil {
  48. return err
  49. }
  50. }
  51. if c.opt.DB > 0 {
  52. if err := client.Select(c.opt.DB).Err(); err != nil {
  53. return err
  54. }
  55. }
  56. return nil
  57. }
  58. func (c *baseClient) freeConn(cn *conn, ei error) error {
  59. if cn.rd.Buffered() > 0 {
  60. return c.connPool.Remove(cn)
  61. }
  62. if _, ok := ei.(redisError); ok {
  63. return c.connPool.Put(cn)
  64. }
  65. return c.connPool.Remove(cn)
  66. }
  67. func (c *baseClient) removeConn(cn *conn) {
  68. if err := c.connPool.Remove(cn); err != nil {
  69. log.Printf("pool.Remove failed: %s", err)
  70. }
  71. }
  72. func (c *baseClient) putConn(cn *conn) {
  73. if err := c.connPool.Put(cn); err != nil {
  74. log.Printf("pool.Put failed: %s", err)
  75. }
  76. }
  77. func (c *baseClient) Process(cmd Cmder) {
  78. if c.cmds == nil {
  79. c.run(cmd)
  80. } else {
  81. c.cmds = append(c.cmds, cmd)
  82. }
  83. }
  84. func (c *baseClient) run(cmd Cmder) {
  85. cn, err := c.conn()
  86. if err != nil {
  87. cmd.setErr(err)
  88. return
  89. }
  90. if timeout := cmd.writeTimeout(); timeout != nil {
  91. cn.writeTimeout = *timeout
  92. } else {
  93. cn.writeTimeout = c.opt.WriteTimeout
  94. }
  95. if timeout := cmd.readTimeout(); timeout != nil {
  96. cn.readTimeout = *timeout
  97. } else {
  98. cn.readTimeout = c.opt.ReadTimeout
  99. }
  100. if err := c.writeCmd(cn, cmd); err != nil {
  101. c.freeConn(cn, err)
  102. cmd.setErr(err)
  103. return
  104. }
  105. if err := cmd.parseReply(cn.rd); err != nil {
  106. c.freeConn(cn, err)
  107. return
  108. }
  109. c.putConn(cn)
  110. }
  111. // Close closes the client, releasing any open resources.
  112. func (c *baseClient) Close() error {
  113. return c.connPool.Close()
  114. }
  115. //------------------------------------------------------------------------------
  116. type options struct {
  117. Password string
  118. DB int64
  119. DialTimeout time.Duration
  120. ReadTimeout time.Duration
  121. WriteTimeout time.Duration
  122. PoolSize int
  123. IdleTimeout time.Duration
  124. }
  125. type Options struct {
  126. Network string
  127. Addr string
  128. // Dialer creates new network connection and has priority over
  129. // Network and Addr options.
  130. Dialer func() (net.Conn, error)
  131. Password string
  132. DB int64
  133. DialTimeout time.Duration
  134. ReadTimeout time.Duration
  135. WriteTimeout time.Duration
  136. PoolSize int
  137. IdleTimeout time.Duration
  138. }
  139. func (opt *Options) getPoolSize() int {
  140. if opt.PoolSize == 0 {
  141. return 10
  142. }
  143. return opt.PoolSize
  144. }
  145. func (opt *Options) getDialTimeout() time.Duration {
  146. if opt.DialTimeout == 0 {
  147. return 5 * time.Second
  148. }
  149. return opt.DialTimeout
  150. }
  151. func (opt *Options) options() *options {
  152. return &options{
  153. DB: opt.DB,
  154. Password: opt.Password,
  155. DialTimeout: opt.getDialTimeout(),
  156. ReadTimeout: opt.ReadTimeout,
  157. WriteTimeout: opt.WriteTimeout,
  158. PoolSize: opt.getPoolSize(),
  159. IdleTimeout: opt.IdleTimeout,
  160. }
  161. }
  162. type Client struct {
  163. *baseClient
  164. }
  165. func NewClient(clOpt *Options) *Client {
  166. opt := clOpt.options()
  167. dialer := clOpt.Dialer
  168. if dialer == nil {
  169. dialer = func() (net.Conn, error) {
  170. return net.DialTimeout(clOpt.Network, clOpt.Addr, opt.DialTimeout)
  171. }
  172. }
  173. return &Client{
  174. baseClient: &baseClient{
  175. opt: opt,
  176. connPool: newConnPool(newConnFunc(dialer), opt),
  177. },
  178. }
  179. }
  180. // Deprecated. Use NewClient instead.
  181. func NewTCPClient(opt *Options) *Client {
  182. opt.Network = "tcp"
  183. return NewClient(opt)
  184. }
  185. // Deprecated. Use NewClient instead.
  186. func NewUnixClient(opt *Options) *Client {
  187. opt.Network = "unix"
  188. return NewClient(opt)
  189. }