token.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. package mssql
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "net"
  7. "strconv"
  8. "strings"
  9. "golang.org/x/net/context"
  10. )
  11. //go:generate stringer -type token
  12. type token byte
  13. // token ids
  14. const (
  15. tokenReturnStatus token = 121 // 0x79
  16. tokenColMetadata token = 129 // 0x81
  17. tokenOrder token = 169 // 0xA9
  18. tokenError token = 170 // 0xAA
  19. tokenInfo token = 171 // 0xAB
  20. tokenLoginAck token = 173 // 0xad
  21. tokenRow token = 209 // 0xd1
  22. tokenNbcRow token = 210 // 0xd2
  23. tokenEnvChange token = 227 // 0xE3
  24. tokenSSPI token = 237 // 0xED
  25. tokenDone token = 253 // 0xFD
  26. tokenDoneProc token = 254
  27. tokenDoneInProc token = 255
  28. )
  29. // done flags
  30. // https://msdn.microsoft.com/en-us/library/dd340421.aspx
  31. const (
  32. doneFinal = 0
  33. doneMore = 1
  34. doneError = 2
  35. doneInxact = 4
  36. doneCount = 0x10
  37. doneAttn = 0x20
  38. doneSrvError = 0x100
  39. )
  40. // ENVCHANGE types
  41. // http://msdn.microsoft.com/en-us/library/dd303449.aspx
  42. const (
  43. envTypDatabase = 1
  44. envTypLanguage = 2
  45. envTypCharset = 3
  46. envTypPacketSize = 4
  47. envSortId = 5
  48. envSortFlags = 6
  49. envSqlCollation = 7
  50. envTypBeginTran = 8
  51. envTypCommitTran = 9
  52. envTypRollbackTran = 10
  53. envEnlistDTC = 11
  54. envDefectTran = 12
  55. envDatabaseMirrorPartner = 13
  56. envPromoteTran = 15
  57. envTranMgrAddr = 16
  58. envTranEnded = 17
  59. envResetConnAck = 18
  60. envStartedInstanceName = 19
  61. envRouting = 20
  62. )
  63. // COLMETADATA flags
  64. // https://msdn.microsoft.com/en-us/library/dd357363.aspx
  65. const (
  66. colFlagNullable = 1
  67. // TODO implement more flags
  68. )
  69. // interface for all tokens
  70. type tokenStruct interface{}
  71. type orderStruct struct {
  72. ColIds []uint16
  73. }
  74. type doneStruct struct {
  75. Status uint16
  76. CurCmd uint16
  77. RowCount uint64
  78. errors []Error
  79. }
  80. func (d doneStruct) isError() bool {
  81. return d.Status&doneError != 0 || len(d.errors) > 0
  82. }
  83. func (d doneStruct) getError() Error {
  84. if len(d.errors) > 0 {
  85. return d.errors[len(d.errors)-1]
  86. } else {
  87. return Error{Message: "Request failed but didn't provide reason"}
  88. }
  89. }
  90. type doneInProcStruct doneStruct
  91. var doneFlags2str = map[uint16]string{
  92. doneFinal: "final",
  93. doneMore: "more",
  94. doneError: "error",
  95. doneInxact: "inxact",
  96. doneCount: "count",
  97. doneAttn: "attn",
  98. doneSrvError: "srverror",
  99. }
  100. func doneFlags2Str(flags uint16) string {
  101. strs := make([]string, 0, len(doneFlags2str))
  102. for flag, tag := range doneFlags2str {
  103. if flags&flag != 0 {
  104. strs = append(strs, tag)
  105. }
  106. }
  107. return strings.Join(strs, "|")
  108. }
  109. // ENVCHANGE stream
  110. // http://msdn.microsoft.com/en-us/library/dd303449.aspx
  111. func processEnvChg(sess *tdsSession) {
  112. size := sess.buf.uint16()
  113. r := &io.LimitedReader{R: sess.buf, N: int64(size)}
  114. for {
  115. var err error
  116. var envtype uint8
  117. err = binary.Read(r, binary.LittleEndian, &envtype)
  118. if err == io.EOF {
  119. return
  120. }
  121. if err != nil {
  122. badStreamPanic(err)
  123. }
  124. switch envtype {
  125. case envTypDatabase:
  126. sess.database, err = readBVarChar(r)
  127. if err != nil {
  128. badStreamPanic(err)
  129. }
  130. _, err = readBVarChar(r)
  131. if err != nil {
  132. badStreamPanic(err)
  133. }
  134. case envTypLanguage:
  135. //currently ignored
  136. // old value
  137. _, err = readBVarChar(r)
  138. if err != nil {
  139. badStreamPanic(err)
  140. }
  141. // new value
  142. _, err = readBVarChar(r)
  143. if err != nil {
  144. badStreamPanic(err)
  145. }
  146. case envTypCharset:
  147. //currently ignored
  148. // old value
  149. _, err = readBVarChar(r)
  150. if err != nil {
  151. badStreamPanic(err)
  152. }
  153. // new value
  154. _, err = readBVarChar(r)
  155. if err != nil {
  156. badStreamPanic(err)
  157. }
  158. case envTypPacketSize:
  159. packetsize, err := readBVarChar(r)
  160. if err != nil {
  161. badStreamPanic(err)
  162. }
  163. _, err = readBVarChar(r)
  164. if err != nil {
  165. badStreamPanic(err)
  166. }
  167. packetsizei, err := strconv.Atoi(packetsize)
  168. if err != nil {
  169. badStreamPanicf("Invalid Packet size value returned from server (%s): %s", packetsize, err.Error())
  170. }
  171. sess.buf.ResizeBuffer(packetsizei)
  172. case envSortId:
  173. // currently ignored
  174. // old value, should be 0
  175. if _, err = readBVarChar(r); err != nil {
  176. badStreamPanic(err)
  177. }
  178. // new value
  179. if _, err = readBVarChar(r); err != nil {
  180. badStreamPanic(err)
  181. }
  182. case envSortFlags:
  183. // currently ignored
  184. // old value, should be 0
  185. if _, err = readBVarChar(r); err != nil {
  186. badStreamPanic(err)
  187. }
  188. // new value
  189. if _, err = readBVarChar(r); err != nil {
  190. badStreamPanic(err)
  191. }
  192. case envSqlCollation:
  193. // currently ignored
  194. // old value
  195. if _, err = readBVarChar(r); err != nil {
  196. badStreamPanic(err)
  197. }
  198. // new value
  199. if _, err = readBVarChar(r); err != nil {
  200. badStreamPanic(err)
  201. }
  202. case envTypBeginTran:
  203. tranid, err := readBVarByte(r)
  204. if len(tranid) != 8 {
  205. badStreamPanicf("invalid size of transaction identifier: %d", len(tranid))
  206. }
  207. sess.tranid = binary.LittleEndian.Uint64(tranid)
  208. if err != nil {
  209. badStreamPanic(err)
  210. }
  211. if sess.logFlags&logTransaction != 0 {
  212. sess.log.Printf("BEGIN TRANSACTION %x\n", sess.tranid)
  213. }
  214. _, err = readBVarByte(r)
  215. if err != nil {
  216. badStreamPanic(err)
  217. }
  218. case envTypCommitTran, envTypRollbackTran:
  219. _, err = readBVarByte(r)
  220. if err != nil {
  221. badStreamPanic(err)
  222. }
  223. _, err = readBVarByte(r)
  224. if err != nil {
  225. badStreamPanic(err)
  226. }
  227. if sess.logFlags&logTransaction != 0 {
  228. if envtype == envTypCommitTran {
  229. sess.log.Printf("COMMIT TRANSACTION %x\n", sess.tranid)
  230. } else {
  231. sess.log.Printf("ROLLBACK TRANSACTION %x\n", sess.tranid)
  232. }
  233. }
  234. sess.tranid = 0
  235. case envEnlistDTC:
  236. // currently ignored
  237. // old value
  238. if _, err = readBVarChar(r); err != nil {
  239. badStreamPanic(err)
  240. }
  241. // new value, should be 0
  242. if _, err = readBVarChar(r); err != nil {
  243. badStreamPanic(err)
  244. }
  245. case envDefectTran:
  246. // currently ignored
  247. // old value, should be 0
  248. if _, err = readBVarChar(r); err != nil {
  249. badStreamPanic(err)
  250. }
  251. // new value
  252. if _, err = readBVarChar(r); err != nil {
  253. badStreamPanic(err)
  254. }
  255. case envDatabaseMirrorPartner:
  256. sess.partner, err = readBVarChar(r)
  257. if err != nil {
  258. badStreamPanic(err)
  259. }
  260. _, err = readBVarChar(r)
  261. if err != nil {
  262. badStreamPanic(err)
  263. }
  264. case envPromoteTran:
  265. // currently ignored
  266. // old value, should be 0
  267. if _, err = readBVarChar(r); err != nil {
  268. badStreamPanic(err)
  269. }
  270. // dtc token
  271. // spec says it should be L_VARBYTE, so this code might be wrong
  272. if _, err = readBVarChar(r); err != nil {
  273. badStreamPanic(err)
  274. }
  275. case envTranMgrAddr:
  276. // currently ignored
  277. // old value, should be 0
  278. if _, err = readBVarChar(r); err != nil {
  279. badStreamPanic(err)
  280. }
  281. // XACT_MANAGER_ADDRESS = B_VARBYTE
  282. if _, err = readBVarChar(r); err != nil {
  283. badStreamPanic(err)
  284. }
  285. case envTranEnded:
  286. // currently ignored
  287. // old value, B_VARBYTE
  288. if _, err = readBVarChar(r); err != nil {
  289. badStreamPanic(err)
  290. }
  291. // should be 0
  292. if _, err = readBVarChar(r); err != nil {
  293. badStreamPanic(err)
  294. }
  295. case envResetConnAck:
  296. // currently ignored
  297. // old value, should be 0
  298. if _, err = readBVarChar(r); err != nil {
  299. badStreamPanic(err)
  300. }
  301. // should be 0
  302. if _, err = readBVarChar(r); err != nil {
  303. badStreamPanic(err)
  304. }
  305. case envStartedInstanceName:
  306. // currently ignored
  307. // old value, should be 0
  308. if _, err = readBVarChar(r); err != nil {
  309. badStreamPanic(err)
  310. }
  311. // instance name
  312. if _, err = readBVarChar(r); err != nil {
  313. badStreamPanic(err)
  314. }
  315. case envRouting:
  316. // RoutingData message is:
  317. // ValueLength USHORT
  318. // Protocol (TCP = 0) BYTE
  319. // ProtocolProperty (new port) USHORT
  320. // AlternateServer US_VARCHAR
  321. _, err := readUshort(r)
  322. if err != nil {
  323. badStreamPanic(err)
  324. }
  325. protocol, err := readByte(r)
  326. if err != nil || protocol != 0 {
  327. badStreamPanic(err)
  328. }
  329. newPort, err := readUshort(r)
  330. if err != nil {
  331. badStreamPanic(err)
  332. }
  333. newServer, err := readUsVarChar(r)
  334. if err != nil {
  335. badStreamPanic(err)
  336. }
  337. // consume the OLDVALUE = %x00 %x00
  338. _, err = readUshort(r)
  339. if err != nil {
  340. badStreamPanic(err)
  341. }
  342. sess.routedServer = newServer
  343. sess.routedPort = newPort
  344. default:
  345. // ignore rest of records because we don't know how to skip those
  346. sess.log.Printf("WARN: Unknown ENVCHANGE record detected with type id = %d\n", envtype)
  347. break
  348. }
  349. }
  350. }
  351. type returnStatus int32
  352. // http://msdn.microsoft.com/en-us/library/dd358180.aspx
  353. func parseReturnStatus(r *tdsBuffer) returnStatus {
  354. return returnStatus(r.int32())
  355. }
  356. func parseOrder(r *tdsBuffer) (res orderStruct) {
  357. len := int(r.uint16())
  358. res.ColIds = make([]uint16, len/2)
  359. for i := 0; i < len/2; i++ {
  360. res.ColIds[i] = r.uint16()
  361. }
  362. return res
  363. }
  364. // https://msdn.microsoft.com/en-us/library/dd340421.aspx
  365. func parseDone(r *tdsBuffer) (res doneStruct) {
  366. res.Status = r.uint16()
  367. res.CurCmd = r.uint16()
  368. res.RowCount = r.uint64()
  369. return res
  370. }
  371. // https://msdn.microsoft.com/en-us/library/dd340553.aspx
  372. func parseDoneInProc(r *tdsBuffer) (res doneInProcStruct) {
  373. res.Status = r.uint16()
  374. res.CurCmd = r.uint16()
  375. res.RowCount = r.uint64()
  376. return res
  377. }
  378. type sspiMsg []byte
  379. func parseSSPIMsg(r *tdsBuffer) sspiMsg {
  380. size := r.uint16()
  381. buf := make([]byte, size)
  382. r.ReadFull(buf)
  383. return sspiMsg(buf)
  384. }
  385. type loginAckStruct struct {
  386. Interface uint8
  387. TDSVersion uint32
  388. ProgName string
  389. ProgVer uint32
  390. }
  391. func parseLoginAck(r *tdsBuffer) loginAckStruct {
  392. size := r.uint16()
  393. buf := make([]byte, size)
  394. r.ReadFull(buf)
  395. var res loginAckStruct
  396. res.Interface = buf[0]
  397. res.TDSVersion = binary.BigEndian.Uint32(buf[1:])
  398. prognamelen := buf[1+4]
  399. var err error
  400. if res.ProgName, err = ucs22str(buf[1+4+1 : 1+4+1+prognamelen*2]); err != nil {
  401. badStreamPanic(err)
  402. }
  403. res.ProgVer = binary.BigEndian.Uint32(buf[size-4:])
  404. return res
  405. }
  406. // http://msdn.microsoft.com/en-us/library/dd357363.aspx
  407. func parseColMetadata72(r *tdsBuffer) (columns []columnStruct) {
  408. count := r.uint16()
  409. if count == 0xffff {
  410. // no metadata is sent
  411. return nil
  412. }
  413. columns = make([]columnStruct, count)
  414. for i := range columns {
  415. column := &columns[i]
  416. column.UserType = r.uint32()
  417. column.Flags = r.uint16()
  418. // parsing TYPE_INFO structure
  419. column.ti = readTypeInfo(r)
  420. column.ColName = r.BVarChar()
  421. }
  422. return columns
  423. }
  424. // http://msdn.microsoft.com/en-us/library/dd357254.aspx
  425. func parseRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
  426. for i, column := range columns {
  427. row[i] = column.ti.Reader(&column.ti, r)
  428. }
  429. }
  430. // http://msdn.microsoft.com/en-us/library/dd304783.aspx
  431. func parseNbcRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
  432. bitlen := (len(columns) + 7) / 8
  433. pres := make([]byte, bitlen)
  434. r.ReadFull(pres)
  435. for i, col := range columns {
  436. if pres[i/8]&(1<<(uint(i)%8)) != 0 {
  437. row[i] = nil
  438. continue
  439. }
  440. row[i] = col.ti.Reader(&col.ti, r)
  441. }
  442. }
  443. // http://msdn.microsoft.com/en-us/library/dd304156.aspx
  444. func parseError72(r *tdsBuffer) (res Error) {
  445. length := r.uint16()
  446. _ = length // ignore length
  447. res.Number = r.int32()
  448. res.State = r.byte()
  449. res.Class = r.byte()
  450. res.Message = r.UsVarChar()
  451. res.ServerName = r.BVarChar()
  452. res.ProcName = r.BVarChar()
  453. res.LineNo = r.int32()
  454. return
  455. }
  456. // http://msdn.microsoft.com/en-us/library/dd304156.aspx
  457. func parseInfo(r *tdsBuffer) (res Error) {
  458. length := r.uint16()
  459. _ = length // ignore length
  460. res.Number = r.int32()
  461. res.State = r.byte()
  462. res.Class = r.byte()
  463. res.Message = r.UsVarChar()
  464. res.ServerName = r.BVarChar()
  465. res.ProcName = r.BVarChar()
  466. res.LineNo = r.int32()
  467. return
  468. }
  469. func processSingleResponse(sess *tdsSession, ch chan tokenStruct) {
  470. defer func() {
  471. if err := recover(); err != nil {
  472. if sess.logFlags&logErrors != 0 {
  473. sess.log.Printf("ERROR: Intercepted panic %v", err)
  474. }
  475. ch <- err
  476. }
  477. close(ch)
  478. }()
  479. packet_type, err := sess.buf.BeginRead()
  480. if err != nil {
  481. if sess.logFlags&logErrors != 0 {
  482. sess.log.Printf("ERROR: BeginRead failed %v", err)
  483. }
  484. ch <- err
  485. return
  486. }
  487. if packet_type != packReply {
  488. badStreamPanicf("invalid response packet type, expected REPLY, actual: %d", packet_type)
  489. }
  490. var columns []columnStruct
  491. errs := make([]Error, 0, 5)
  492. for {
  493. token := token(sess.buf.byte())
  494. if sess.logFlags&logDebug != 0 {
  495. sess.log.Printf("got token %v", token)
  496. }
  497. switch token {
  498. case tokenSSPI:
  499. ch <- parseSSPIMsg(sess.buf)
  500. return
  501. case tokenReturnStatus:
  502. returnStatus := parseReturnStatus(sess.buf)
  503. ch <- returnStatus
  504. case tokenLoginAck:
  505. loginAck := parseLoginAck(sess.buf)
  506. ch <- loginAck
  507. case tokenOrder:
  508. order := parseOrder(sess.buf)
  509. ch <- order
  510. case tokenDoneInProc:
  511. done := parseDoneInProc(sess.buf)
  512. if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
  513. sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
  514. }
  515. ch <- done
  516. case tokenDone, tokenDoneProc:
  517. done := parseDone(sess.buf)
  518. done.errors = errs
  519. if sess.logFlags&logDebug != 0 {
  520. sess.log.Printf("got DONE or DONEPROC status=%d", done.Status)
  521. }
  522. if done.Status&doneSrvError != 0 {
  523. ch <- errors.New("SQL Server had internal error")
  524. return
  525. }
  526. if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
  527. sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
  528. }
  529. ch <- done
  530. if done.Status&doneMore == 0 {
  531. return
  532. }
  533. case tokenColMetadata:
  534. columns = parseColMetadata72(sess.buf)
  535. ch <- columns
  536. case tokenRow:
  537. row := make([]interface{}, len(columns))
  538. parseRow(sess.buf, columns, row)
  539. ch <- row
  540. case tokenNbcRow:
  541. row := make([]interface{}, len(columns))
  542. parseNbcRow(sess.buf, columns, row)
  543. ch <- row
  544. case tokenEnvChange:
  545. processEnvChg(sess)
  546. case tokenError:
  547. err := parseError72(sess.buf)
  548. if sess.logFlags&logDebug != 0 {
  549. sess.log.Printf("got ERROR %d %s", err.Number, err.Message)
  550. }
  551. errs = append(errs, err)
  552. if sess.logFlags&logErrors != 0 {
  553. sess.log.Println(err.Message)
  554. }
  555. case tokenInfo:
  556. info := parseInfo(sess.buf)
  557. if sess.logFlags&logDebug != 0 {
  558. sess.log.Printf("got INFO %d %s", info.Number, info.Message)
  559. }
  560. if sess.logFlags&logMessages != 0 {
  561. sess.log.Println(info.Message)
  562. }
  563. default:
  564. badStreamPanicf("Unknown token type: %d", token)
  565. }
  566. }
  567. }
  568. type parseRespIter byte
  569. const (
  570. parseRespIterContinue parseRespIter = iota // Continue parsing current token.
  571. parseRespIterNext // Fetch the next token.
  572. parseRespIterDone // Done with parsing the response.
  573. )
  574. type parseRespState byte
  575. const (
  576. parseRespStateNormal parseRespState = iota // Normal response state.
  577. parseRespStateCancel // Query is canceled, wait for server to confirm.
  578. parseRespStateClosing // Waiting for tokens to come through.
  579. )
  580. type parseResp struct {
  581. sess *tdsSession
  582. ctxDone <-chan struct{}
  583. state parseRespState
  584. cancelError error
  585. }
  586. func (ts *parseResp) sendAttention(ch chan tokenStruct) parseRespIter {
  587. if err := sendAttention(ts.sess.buf); err != nil {
  588. ts.dlogf("failed to send attention signal %v", err)
  589. ch <- err
  590. return parseRespIterDone
  591. }
  592. ts.state = parseRespStateCancel
  593. return parseRespIterContinue
  594. }
  595. func (ts *parseResp) dlog(msg string) {
  596. if ts.sess.logFlags&logDebug != 0 {
  597. ts.sess.log.Println(msg)
  598. }
  599. }
  600. func (ts *parseResp) dlogf(f string, v ...interface{}) {
  601. if ts.sess.logFlags&logDebug != 0 {
  602. ts.sess.log.Printf(f, v...)
  603. }
  604. }
  605. func (ts *parseResp) iter(ctx context.Context, ch chan tokenStruct, tokChan chan tokenStruct) parseRespIter {
  606. switch ts.state {
  607. default:
  608. panic("unknown state")
  609. case parseRespStateNormal:
  610. select {
  611. case tok, ok := <-tokChan:
  612. if !ok {
  613. ts.dlog("response finished")
  614. return parseRespIterDone
  615. }
  616. if err, ok := tok.(net.Error); ok && err.Timeout() {
  617. ts.cancelError = err
  618. ts.dlog("got timeout error, sending attention signal to server")
  619. return ts.sendAttention(ch)
  620. }
  621. // Pass the token along.
  622. ch <- tok
  623. return parseRespIterContinue
  624. case <-ts.ctxDone:
  625. ts.ctxDone = nil
  626. ts.dlog("got cancel message, sending attention signal to server")
  627. return ts.sendAttention(ch)
  628. }
  629. case parseRespStateCancel: // Read all responses until a DONE or error is received.Auth
  630. select {
  631. case tok, ok := <-tokChan:
  632. if !ok {
  633. ts.dlog("response finished but waiting for attention ack")
  634. return parseRespIterNext
  635. }
  636. switch tok := tok.(type) {
  637. default:
  638. // Ignore all other tokens while waiting.
  639. // The TDS spec says other tokens may arrive after an attention
  640. // signal is sent. Ignore these tokens and continue looking for
  641. // a DONE with attention confirm mark.
  642. case doneStruct:
  643. if tok.Status&doneAttn != 0 {
  644. ts.dlog("got cancellation confirmation from server")
  645. if ts.cancelError != nil {
  646. ch <- ts.cancelError
  647. ts.cancelError = nil
  648. } else {
  649. ch <- ctx.Err()
  650. }
  651. return parseRespIterDone
  652. }
  653. // If an error happens during cancel, pass it along and just stop.
  654. // We are uncertain to receive more tokens.
  655. case error:
  656. ch <- tok
  657. ts.state = parseRespStateClosing
  658. }
  659. return parseRespIterContinue
  660. case <-ts.ctxDone:
  661. ts.ctxDone = nil
  662. ts.state = parseRespStateClosing
  663. return parseRespIterContinue
  664. }
  665. case parseRespStateClosing: // Wait for current token chan to close.
  666. if _, ok := <-tokChan; !ok {
  667. ts.dlog("response finished")
  668. return parseRespIterDone
  669. }
  670. return parseRespIterContinue
  671. }
  672. }
  673. func processResponse(ctx context.Context, sess *tdsSession, ch chan tokenStruct) {
  674. ts := &parseResp{
  675. sess: sess,
  676. ctxDone: ctx.Done(),
  677. }
  678. defer func() {
  679. // Ensure any remaining error is piped through
  680. // or the query may look like it executed when it actually failed.
  681. if ts.cancelError != nil {
  682. ch <- ts.cancelError
  683. ts.cancelError = nil
  684. }
  685. close(ch)
  686. }()
  687. // Loop over multiple responses.
  688. for {
  689. ts.dlog("initiating resonse reading")
  690. tokChan := make(chan tokenStruct)
  691. go processSingleResponse(sess, tokChan)
  692. // Loop over multiple tokens in response.
  693. tokensLoop:
  694. for {
  695. switch ts.iter(ctx, ch, tokChan) {
  696. case parseRespIterContinue:
  697. // Nothing, continue to next token.
  698. case parseRespIterNext:
  699. break tokensLoop
  700. case parseRespIterDone:
  701. return
  702. }
  703. }
  704. }
  705. }