stream.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. // Package quantile computes approximate quantiles over an unbounded data
  2. // stream within low memory and CPU bounds.
  3. //
  4. // A small amount of accuracy is traded to achieve the above properties.
  5. //
  6. // Multiple streams can be merged before calling Query to generate a single set
  7. // of results. This is meaningful when the streams represent the same type of
  8. // data. See Merge and Samples.
  9. //
  10. // For more detailed information about the algorithm used, see:
  11. //
  12. // Effective Computation of Biased Quantiles over Data Streams
  13. //
  14. // http://www.cs.rutgers.edu/~muthu/bquant.pdf
  15. package quantile
  16. import (
  17. "math"
  18. "sort"
  19. )
  20. // Sample holds an observed value and meta information for compression. JSON
  21. // tags have been added for convenience.
  22. type Sample struct {
  23. Value float64 `json:",string"`
  24. Width float64 `json:",string"`
  25. Delta float64 `json:",string"`
  26. }
  27. // Samples represents a slice of samples. It implements sort.Interface.
  28. type Samples []Sample
  29. func (a Samples) Len() int { return len(a) }
  30. func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
  31. func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  32. type invariant func(s *stream, r float64) float64
  33. // NewLowBiased returns an initialized Stream for low-biased quantiles
  34. // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
  35. // error guarantees can still be given even for the lower ranks of the data
  36. // distribution.
  37. //
  38. // The provided epsilon is a relative error, i.e. the true quantile of a value
  39. // returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
  40. //
  41. // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
  42. // properties.
  43. func NewLowBiased(epsilon float64) *Stream {
  44. ƒ := func(s *stream, r float64) float64 {
  45. return 2 * epsilon * r
  46. }
  47. return newStream(ƒ)
  48. }
  49. // NewHighBiased returns an initialized Stream for high-biased quantiles
  50. // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
  51. // error guarantees can still be given even for the higher ranks of the data
  52. // distribution.
  53. //
  54. // The provided epsilon is a relative error, i.e. the true quantile of a value
  55. // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
  56. //
  57. // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
  58. // properties.
  59. func NewHighBiased(epsilon float64) *Stream {
  60. ƒ := func(s *stream, r float64) float64 {
  61. return 2 * epsilon * (s.n - r)
  62. }
  63. return newStream(ƒ)
  64. }
  65. // NewTargeted returns an initialized Stream concerned with a particular set of
  66. // quantile values that are supplied a priori. Knowing these a priori reduces
  67. // space and computation time. The targets map maps the desired quantiles to
  68. // their absolute errors, i.e. the true quantile of a value returned by a query
  69. // is guaranteed to be within (Quantile±Epsilon).
  70. //
  71. // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
  72. func NewTargeted(targetMap map[float64]float64) *Stream {
  73. // Convert map to slice to avoid slow iterations on a map.
  74. // ƒ is called on the hot path, so converting the map to a slice
  75. // beforehand results in significant CPU savings.
  76. targets := targetMapToSlice(targetMap)
  77. ƒ := func(s *stream, r float64) float64 {
  78. var m = math.MaxFloat64
  79. var f float64
  80. for _, t := range targets {
  81. if t.quantile*s.n <= r {
  82. f = (2 * t.epsilon * r) / t.quantile
  83. } else {
  84. f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
  85. }
  86. if f < m {
  87. m = f
  88. }
  89. }
  90. return m
  91. }
  92. return newStream(ƒ)
  93. }
  94. type target struct {
  95. quantile float64
  96. epsilon float64
  97. }
  98. func targetMapToSlice(targetMap map[float64]float64) []target {
  99. targets := make([]target, 0, len(targetMap))
  100. for quantile, epsilon := range targetMap {
  101. t := target{
  102. quantile: quantile,
  103. epsilon: epsilon,
  104. }
  105. targets = append(targets, t)
  106. }
  107. return targets
  108. }
  109. // Stream computes quantiles for a stream of float64s. It is not thread-safe by
  110. // design. Take care when using across multiple goroutines.
  111. type Stream struct {
  112. *stream
  113. b Samples
  114. sorted bool
  115. }
  116. func newStream(ƒ invariant) *Stream {
  117. x := &stream{ƒ: ƒ}
  118. return &Stream{x, make(Samples, 0, 500), true}
  119. }
  120. // Insert inserts v into the stream.
  121. func (s *Stream) Insert(v float64) {
  122. s.insert(Sample{Value: v, Width: 1})
  123. }
  124. func (s *Stream) insert(sample Sample) {
  125. s.b = append(s.b, sample)
  126. s.sorted = false
  127. if len(s.b) == cap(s.b) {
  128. s.flush()
  129. }
  130. }
  131. // Query returns the computed qth percentiles value. If s was created with
  132. // NewTargeted, and q is not in the set of quantiles provided a priori, Query
  133. // will return an unspecified result.
  134. func (s *Stream) Query(q float64) float64 {
  135. if !s.flushed() {
  136. // Fast path when there hasn't been enough data for a flush;
  137. // this also yields better accuracy for small sets of data.
  138. l := len(s.b)
  139. if l == 0 {
  140. return 0
  141. }
  142. i := int(math.Ceil(float64(l) * q))
  143. if i > 0 {
  144. i -= 1
  145. }
  146. s.maybeSort()
  147. return s.b[i].Value
  148. }
  149. s.flush()
  150. return s.stream.query(q)
  151. }
  152. // Merge merges samples into the underlying streams samples. This is handy when
  153. // merging multiple streams from separate threads, database shards, etc.
  154. //
  155. // ATTENTION: This method is broken and does not yield correct results. The
  156. // underlying algorithm is not capable of merging streams correctly.
  157. func (s *Stream) Merge(samples Samples) {
  158. sort.Sort(samples)
  159. s.stream.merge(samples)
  160. }
  161. // Reset reinitializes and clears the list reusing the samples buffer memory.
  162. func (s *Stream) Reset() {
  163. s.stream.reset()
  164. s.b = s.b[:0]
  165. }
  166. // Samples returns stream samples held by s.
  167. func (s *Stream) Samples() Samples {
  168. if !s.flushed() {
  169. return s.b
  170. }
  171. s.flush()
  172. return s.stream.samples()
  173. }
  174. // Count returns the total number of samples observed in the stream
  175. // since initialization.
  176. func (s *Stream) Count() int {
  177. return len(s.b) + s.stream.count()
  178. }
  179. func (s *Stream) flush() {
  180. s.maybeSort()
  181. s.stream.merge(s.b)
  182. s.b = s.b[:0]
  183. }
  184. func (s *Stream) maybeSort() {
  185. if !s.sorted {
  186. s.sorted = true
  187. sort.Sort(s.b)
  188. }
  189. }
  190. func (s *Stream) flushed() bool {
  191. return len(s.stream.l) > 0
  192. }
  193. type stream struct {
  194. n float64
  195. l []Sample
  196. ƒ invariant
  197. }
  198. func (s *stream) reset() {
  199. s.l = s.l[:0]
  200. s.n = 0
  201. }
  202. func (s *stream) insert(v float64) {
  203. s.merge(Samples{{v, 1, 0}})
  204. }
  205. func (s *stream) merge(samples Samples) {
  206. // TODO(beorn7): This tries to merge not only individual samples, but
  207. // whole summaries. The paper doesn't mention merging summaries at
  208. // all. Unittests show that the merging is inaccurate. Find out how to
  209. // do merges properly.
  210. var r float64
  211. i := 0
  212. for _, sample := range samples {
  213. for ; i < len(s.l); i++ {
  214. c := s.l[i]
  215. if c.Value > sample.Value {
  216. // Insert at position i.
  217. s.l = append(s.l, Sample{})
  218. copy(s.l[i+1:], s.l[i:])
  219. s.l[i] = Sample{
  220. sample.Value,
  221. sample.Width,
  222. math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
  223. // TODO(beorn7): How to calculate delta correctly?
  224. }
  225. i++
  226. goto inserted
  227. }
  228. r += c.Width
  229. }
  230. s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
  231. i++
  232. inserted:
  233. s.n += sample.Width
  234. r += sample.Width
  235. }
  236. s.compress()
  237. }
  238. func (s *stream) count() int {
  239. return int(s.n)
  240. }
  241. func (s *stream) query(q float64) float64 {
  242. t := math.Ceil(q * s.n)
  243. t += math.Ceil(s.ƒ(s, t) / 2)
  244. p := s.l[0]
  245. var r float64
  246. for _, c := range s.l[1:] {
  247. r += p.Width
  248. if r+c.Width+c.Delta > t {
  249. return p.Value
  250. }
  251. p = c
  252. }
  253. return p.Value
  254. }
  255. func (s *stream) compress() {
  256. if len(s.l) < 2 {
  257. return
  258. }
  259. x := s.l[len(s.l)-1]
  260. xi := len(s.l) - 1
  261. r := s.n - 1 - x.Width
  262. for i := len(s.l) - 2; i >= 0; i-- {
  263. c := s.l[i]
  264. if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
  265. x.Width += c.Width
  266. s.l[xi] = x
  267. // Remove element at i.
  268. copy(s.l[i:], s.l[i+1:])
  269. s.l = s.l[:len(s.l)-1]
  270. xi -= 1
  271. } else {
  272. x = c
  273. xi = i
  274. }
  275. r -= c.Width
  276. }
  277. }
  278. func (s *stream) samples() Samples {
  279. samples := make(Samples, len(s.l))
  280. copy(samples, s.l)
  281. return samples
  282. }