123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782 |
- package pq
- import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- type Notification struct {
-
- BePid int
-
- Channel string
-
- Extra string
- }
- func recvNotification(r *readBuf) *Notification {
- bePid := r.int32()
- channel := r.string()
- extra := r.string()
- return &Notification{bePid, channel, extra}
- }
- const (
- connStateIdle int32 = iota
- connStateExpectResponse
- connStateExpectReadyForQuery
- )
- type message struct {
- typ byte
- err error
- }
- var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
- type ListenerConn struct {
-
- connectionLock sync.Mutex
- cn *conn
- err error
- connState int32
-
- senderLock sync.Mutex
- notificationChan chan<- *Notification
- replyChan chan message
- }
- func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
- return newDialListenerConn(defaultDialer{}, name, notificationChan)
- }
- func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
- cn, err := DialOpen(d, name)
- if err != nil {
- return nil, err
- }
- l := &ListenerConn{
- cn: cn.(*conn),
- notificationChan: c,
- connState: connStateIdle,
- replyChan: make(chan message, 2),
- }
- go l.listenerConnMain()
- return l, nil
- }
- func (l *ListenerConn) acquireSenderLock() error {
-
- l.senderLock.Lock()
- l.connectionLock.Lock()
- err := l.err
- l.connectionLock.Unlock()
- if err != nil {
- l.senderLock.Unlock()
- return err
- }
- return nil
- }
- func (l *ListenerConn) releaseSenderLock() {
- l.senderLock.Unlock()
- }
- func (l *ListenerConn) setState(newState int32) bool {
- var expectedState int32
- switch newState {
- case connStateIdle:
- expectedState = connStateExpectReadyForQuery
- case connStateExpectResponse:
- expectedState = connStateIdle
- case connStateExpectReadyForQuery:
- expectedState = connStateExpectResponse
- default:
- panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
- }
- return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
- }
- func (l *ListenerConn) listenerConnLoop() (err error) {
- defer errRecoverNoErrBadConn(&err)
- r := &readBuf{}
- for {
- t, err := l.cn.recvMessage(r)
- if err != nil {
- return err
- }
- switch t {
- case 'A':
-
-
- l.notificationChan <- recvNotification(r)
- case 'T', 'D':
-
- case 'E':
-
-
-
-
- if !l.setState(connStateExpectReadyForQuery) {
- return parseError(r)
- }
- l.replyChan <- message{t, parseError(r)}
- case 'C', 'I':
- if !l.setState(connStateExpectReadyForQuery) {
-
- return fmt.Errorf("unexpected CommandComplete")
- }
-
- case 'Z':
- if !l.setState(connStateIdle) {
-
- return fmt.Errorf("unexpected ReadyForQuery")
- }
- l.replyChan <- message{t, nil}
- case 'N', 'S':
-
- default:
- return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
- }
- }
- }
- func (l *ListenerConn) listenerConnMain() {
- err := l.listenerConnLoop()
-
-
-
-
-
-
-
-
-
-
- l.connectionLock.Lock()
- if l.err == nil {
- l.err = err
- }
- l.cn.Close()
- l.connectionLock.Unlock()
-
-
- close(l.replyChan)
-
- close(l.notificationChan)
-
- }
- func (l *ListenerConn) Listen(channel string) (bool, error) {
- return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
- }
- func (l *ListenerConn) Unlisten(channel string) (bool, error) {
- return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
- }
- func (l *ListenerConn) UnlistenAll() (bool, error) {
- return l.ExecSimpleQuery("UNLISTEN *")
- }
- func (l *ListenerConn) Ping() error {
- sent, err := l.ExecSimpleQuery("")
- if !sent {
- return err
- }
- if err != nil {
-
- panic(err)
- }
- return nil
- }
- func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
- defer errRecoverNoErrBadConn(&err)
-
- if !l.setState(connStateExpectResponse) {
- panic("two queries running at the same time")
- }
-
-
- b := &writeBuf{
- buf: []byte("Q\x00\x00\x00\x00"),
- pos: 1,
- }
- b.string(q)
- l.cn.send(b)
- return nil
- }
- func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
- if err = l.acquireSenderLock(); err != nil {
- return false, err
- }
- defer l.releaseSenderLock()
- err = l.sendSimpleQuery(q)
- if err != nil {
-
-
- l.connectionLock.Lock()
-
-
- if l.err == nil {
- l.err = err
- }
- l.connectionLock.Unlock()
- l.cn.c.Close()
- return false, err
- }
-
- for {
- m, ok := <-l.replyChan
- if !ok {
-
-
- l.connectionLock.Lock()
- err := l.err
- l.connectionLock.Unlock()
- return false, err
- }
- switch m.typ {
- case 'Z':
-
- if m.err != nil {
- panic("m.err != nil")
- }
-
- return true, err
- case 'E':
-
- if m.err == nil {
- panic("m.err == nil")
- }
-
- err = m.err
- default:
- return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
- }
- }
- }
- func (l *ListenerConn) Close() error {
- l.connectionLock.Lock()
- if l.err != nil {
- l.connectionLock.Unlock()
- return errListenerConnClosed
- }
- l.err = errListenerConnClosed
- l.connectionLock.Unlock()
-
-
- return l.cn.c.Close()
- }
- func (l *ListenerConn) Err() error {
- return l.err
- }
- var errListenerClosed = errors.New("pq: Listener has been closed")
- var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
- var ErrChannelNotOpen = errors.New("pq: channel is not open")
- type ListenerEventType int
- const (
-
-
- ListenerEventConnected ListenerEventType = iota
-
-
-
- ListenerEventDisconnected
-
-
-
- ListenerEventReconnected
-
-
-
- ListenerEventConnectionAttemptFailed
- )
- type EventCallbackType func(event ListenerEventType, err error)
- type Listener struct {
-
-
- Notify chan *Notification
- name string
- minReconnectInterval time.Duration
- maxReconnectInterval time.Duration
- dialer Dialer
- eventCallback EventCallbackType
- lock sync.Mutex
- isClosed bool
- reconnectCond *sync.Cond
- cn *ListenerConn
- connNotificationChan <-chan *Notification
- channels map[string]struct{}
- }
- func NewListener(name string,
- minReconnectInterval time.Duration,
- maxReconnectInterval time.Duration,
- eventCallback EventCallbackType) *Listener {
- return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
- }
- func NewDialListener(d Dialer,
- name string,
- minReconnectInterval time.Duration,
- maxReconnectInterval time.Duration,
- eventCallback EventCallbackType) *Listener {
- l := &Listener{
- name: name,
- minReconnectInterval: minReconnectInterval,
- maxReconnectInterval: maxReconnectInterval,
- dialer: d,
- eventCallback: eventCallback,
- channels: make(map[string]struct{}),
- Notify: make(chan *Notification, 32),
- }
- l.reconnectCond = sync.NewCond(&l.lock)
- go l.listenerMain()
- return l
- }
- func (l *Listener) NotificationChannel() <-chan *Notification {
- return l.Notify
- }
- func (l *Listener) Listen(channel string) error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
-
-
-
-
- _, exists := l.channels[channel]
- if exists {
- return ErrChannelAlreadyOpen
- }
- if l.cn != nil {
-
-
-
-
-
-
-
- gotResponse, err := l.cn.Listen(channel)
- if gotResponse && err != nil {
- return err
- }
- }
- l.channels[channel] = struct{}{}
- for l.cn == nil {
- l.reconnectCond.Wait()
-
- if l.isClosed {
- return errListenerClosed
- }
- }
- return nil
- }
- func (l *Listener) Unlisten(channel string) error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
-
-
- _, exists := l.channels[channel]
- if !exists {
- return ErrChannelNotOpen
- }
- if l.cn != nil {
-
-
-
- gotResponse, err := l.cn.Unlisten(channel)
- if gotResponse && err != nil {
- return err
- }
- }
-
- delete(l.channels, channel)
- return nil
- }
- func (l *Listener) UnlistenAll() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- if l.cn != nil {
-
-
-
- gotResponse, err := l.cn.UnlistenAll()
- if gotResponse && err != nil {
- return err
- }
- }
-
- l.channels = make(map[string]struct{})
- return nil
- }
- func (l *Listener) Ping() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- if l.cn == nil {
- return errors.New("no connection")
- }
- return l.cn.Ping()
- }
- func (l *Listener) disconnectCleanup() error {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- select {
- case _, ok := <-l.connNotificationChan:
- if ok {
- panic("connNotificationChan not closed")
- }
- default:
- panic("connNotificationChan not closed")
- }
- err := l.cn.Err()
- l.cn.Close()
- l.cn = nil
- return err
- }
- func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
- doneChan := make(chan error)
- go func() {
- for channel := range l.channels {
-
-
- gotResponse, err := cn.Listen(channel)
- if gotResponse && err != nil {
- doneChan <- err
- return
- }
-
-
-
- if err != nil {
- for _ = range notificationChan {
- }
- doneChan <- cn.Err()
- return
- }
- }
- doneChan <- nil
- }()
-
-
-
-
-
- for {
- select {
- case _, ok := <-notificationChan:
- if !ok {
- notificationChan = nil
- }
- case err := <-doneChan:
- return err
- }
- }
- }
- func (l *Listener) closed() bool {
- l.lock.Lock()
- defer l.lock.Unlock()
- return l.isClosed
- }
- func (l *Listener) connect() error {
- notificationChan := make(chan *Notification, 32)
- cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
- if err != nil {
- return err
- }
- l.lock.Lock()
- defer l.lock.Unlock()
- err = l.resync(cn, notificationChan)
- if err != nil {
- cn.Close()
- return err
- }
- l.cn = cn
- l.connNotificationChan = notificationChan
- l.reconnectCond.Broadcast()
- return nil
- }
- func (l *Listener) Close() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- if l.cn != nil {
- l.cn.Close()
- }
- l.isClosed = true
- return nil
- }
- func (l *Listener) emitEvent(event ListenerEventType, err error) {
- if l.eventCallback != nil {
- l.eventCallback(event, err)
- }
- }
- func (l *Listener) listenerConnLoop() {
- var nextReconnect time.Time
- reconnectInterval := l.minReconnectInterval
- for {
- for {
- err := l.connect()
- if err == nil {
- break
- }
- if l.closed() {
- return
- }
- l.emitEvent(ListenerEventConnectionAttemptFailed, err)
- time.Sleep(reconnectInterval)
- reconnectInterval *= 2
- if reconnectInterval > l.maxReconnectInterval {
- reconnectInterval = l.maxReconnectInterval
- }
- }
- if nextReconnect.IsZero() {
- l.emitEvent(ListenerEventConnected, nil)
- } else {
- l.emitEvent(ListenerEventReconnected, nil)
- l.Notify <- nil
- }
- reconnectInterval = l.minReconnectInterval
- nextReconnect = time.Now().Add(reconnectInterval)
- for {
- notification, ok := <-l.connNotificationChan
- if !ok {
-
- break
- }
- l.Notify <- notification
- }
- err := l.disconnectCleanup()
- if l.closed() {
- return
- }
- l.emitEvent(ListenerEventDisconnected, err)
- time.Sleep(nextReconnect.Sub(time.Now()))
- }
- }
- func (l *Listener) listenerMain() {
- l.listenerConnLoop()
- close(l.Notify)
- }
|