unbounded_executor.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package concurrent
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. "sync"
  8. "time"
  9. "reflect"
  10. )
  11. // HandlePanic logs goroutine panic by default
  12. var HandlePanic = func(recovered interface{}, funcName string) {
  13. ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
  14. ErrorLogger.Println(string(debug.Stack()))
  15. }
  16. // UnboundedExecutor is a executor without limits on counts of alive goroutines
  17. // it tracks the goroutine started by it, and can cancel them when shutdown
  18. type UnboundedExecutor struct {
  19. ctx context.Context
  20. cancel context.CancelFunc
  21. activeGoroutinesMutex *sync.Mutex
  22. activeGoroutines map[string]int
  23. HandlePanic func(recovered interface{}, funcName string)
  24. }
  25. // GlobalUnboundedExecutor has the life cycle of the program itself
  26. // any goroutine want to be shutdown before main exit can be started from this executor
  27. // GlobalUnboundedExecutor expects the main function to call stop
  28. // it does not magically knows the main function exits
  29. var GlobalUnboundedExecutor = NewUnboundedExecutor()
  30. // NewUnboundedExecutor creates a new UnboundedExecutor,
  31. // UnboundedExecutor can not be created by &UnboundedExecutor{}
  32. // HandlePanic can be set with a callback to override global HandlePanic
  33. func NewUnboundedExecutor() *UnboundedExecutor {
  34. ctx, cancel := context.WithCancel(context.TODO())
  35. return &UnboundedExecutor{
  36. ctx: ctx,
  37. cancel: cancel,
  38. activeGoroutinesMutex: &sync.Mutex{},
  39. activeGoroutines: map[string]int{},
  40. }
  41. }
  42. // Go starts a new goroutine and tracks its lifecycle.
  43. // Panic will be recovered and logged automatically, except for StopSignal
  44. func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
  45. pc := reflect.ValueOf(handler).Pointer()
  46. f := runtime.FuncForPC(pc)
  47. funcName := f.Name()
  48. file, line := f.FileLine(pc)
  49. executor.activeGoroutinesMutex.Lock()
  50. defer executor.activeGoroutinesMutex.Unlock()
  51. startFrom := fmt.Sprintf("%s:%d", file, line)
  52. executor.activeGoroutines[startFrom] += 1
  53. go func() {
  54. defer func() {
  55. recovered := recover()
  56. // if you want to quit a goroutine without trigger HandlePanic
  57. // use runtime.Goexit() to quit
  58. if recovered != nil {
  59. if executor.HandlePanic == nil {
  60. HandlePanic(recovered, funcName)
  61. } else {
  62. executor.HandlePanic(recovered, funcName)
  63. }
  64. }
  65. executor.activeGoroutinesMutex.Lock()
  66. executor.activeGoroutines[startFrom] -= 1
  67. executor.activeGoroutinesMutex.Unlock()
  68. }()
  69. handler(executor.ctx)
  70. }()
  71. }
  72. // Stop cancel all goroutines started by this executor without wait
  73. func (executor *UnboundedExecutor) Stop() {
  74. executor.cancel()
  75. }
  76. // StopAndWaitForever cancel all goroutines started by this executor and
  77. // wait until all goroutines exited
  78. func (executor *UnboundedExecutor) StopAndWaitForever() {
  79. executor.StopAndWait(context.Background())
  80. }
  81. // StopAndWait cancel all goroutines started by this executor and wait.
  82. // Wait can be cancelled by the context passed in.
  83. func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
  84. executor.cancel()
  85. for {
  86. oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
  87. select {
  88. case <-oneHundredMilliseconds.C:
  89. if executor.checkNoActiveGoroutines() {
  90. return
  91. }
  92. case <-ctx.Done():
  93. return
  94. }
  95. }
  96. }
  97. func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
  98. executor.activeGoroutinesMutex.Lock()
  99. defer executor.activeGoroutinesMutex.Unlock()
  100. for startFrom, count := range executor.activeGoroutines {
  101. if count > 0 {
  102. InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
  103. "startFrom", startFrom,
  104. "count", count)
  105. return false
  106. }
  107. }
  108. return true
  109. }