channel.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. // Copyright 2011 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package ssh
  5. import (
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "log"
  11. "sync"
  12. )
  13. const (
  14. minPacketLength = 9
  15. // channelMaxPacket contains the maximum number of bytes that will be
  16. // sent in a single packet. As per RFC 4253, section 6.1, 32k is also
  17. // the minimum.
  18. channelMaxPacket = 1 << 15
  19. // We follow OpenSSH here.
  20. channelWindowSize = 64 * channelMaxPacket
  21. )
  22. // NewChannel represents an incoming request to a channel. It must either be
  23. // accepted for use by calling Accept, or rejected by calling Reject.
  24. type NewChannel interface {
  25. // Accept accepts the channel creation request. It returns the Channel
  26. // and a Go channel containing SSH requests. The Go channel must be
  27. // serviced otherwise the Channel will hang.
  28. Accept() (Channel, <-chan *Request, error)
  29. // Reject rejects the channel creation request. After calling
  30. // this, no other methods on the Channel may be called.
  31. Reject(reason RejectionReason, message string) error
  32. // ChannelType returns the type of the channel, as supplied by the
  33. // client.
  34. ChannelType() string
  35. // ExtraData returns the arbitrary payload for this channel, as supplied
  36. // by the client. This data is specific to the channel type.
  37. ExtraData() []byte
  38. }
  39. // A Channel is an ordered, reliable, flow-controlled, duplex stream
  40. // that is multiplexed over an SSH connection.
  41. type Channel interface {
  42. // Read reads up to len(data) bytes from the channel.
  43. Read(data []byte) (int, error)
  44. // Write writes len(data) bytes to the channel.
  45. Write(data []byte) (int, error)
  46. // Close signals end of channel use. No data may be sent after this
  47. // call.
  48. Close() error
  49. // CloseWrite signals the end of sending in-band
  50. // data. Requests may still be sent, and the other side may
  51. // still send data
  52. CloseWrite() error
  53. // SendRequest sends a channel request. If wantReply is true,
  54. // it will wait for a reply and return the result as a
  55. // boolean, otherwise the return value will be false. Channel
  56. // requests are out-of-band messages so they may be sent even
  57. // if the data stream is closed or blocked by flow control.
  58. SendRequest(name string, wantReply bool, payload []byte) (bool, error)
  59. // Stderr returns an io.ReadWriter that writes to this channel
  60. // with the extended data type set to stderr. Stderr may
  61. // safely be read and written from a different goroutine than
  62. // Read and Write respectively.
  63. Stderr() io.ReadWriter
  64. }
  65. // Request is a request sent outside of the normal stream of
  66. // data. Requests can either be specific to an SSH channel, or they
  67. // can be global.
  68. type Request struct {
  69. Type string
  70. WantReply bool
  71. Payload []byte
  72. ch *channel
  73. mux *mux
  74. }
  75. // Reply sends a response to a request. It must be called for all requests
  76. // where WantReply is true and is a no-op otherwise. The payload argument is
  77. // ignored for replies to channel-specific requests.
  78. func (r *Request) Reply(ok bool, payload []byte) error {
  79. if !r.WantReply {
  80. return nil
  81. }
  82. if r.ch == nil {
  83. return r.mux.ackRequest(ok, payload)
  84. }
  85. return r.ch.ackRequest(ok)
  86. }
  87. // RejectionReason is an enumeration used when rejecting channel creation
  88. // requests. See RFC 4254, section 5.1.
  89. type RejectionReason uint32
  90. const (
  91. Prohibited RejectionReason = iota + 1
  92. ConnectionFailed
  93. UnknownChannelType
  94. ResourceShortage
  95. )
  96. // String converts the rejection reason to human readable form.
  97. func (r RejectionReason) String() string {
  98. switch r {
  99. case Prohibited:
  100. return "administratively prohibited"
  101. case ConnectionFailed:
  102. return "connect failed"
  103. case UnknownChannelType:
  104. return "unknown channel type"
  105. case ResourceShortage:
  106. return "resource shortage"
  107. }
  108. return fmt.Sprintf("unknown reason %d", int(r))
  109. }
  110. func min(a uint32, b int) uint32 {
  111. if a < uint32(b) {
  112. return a
  113. }
  114. return uint32(b)
  115. }
  116. type channelDirection uint8
  117. const (
  118. channelInbound channelDirection = iota
  119. channelOutbound
  120. )
  121. // channel is an implementation of the Channel interface that works
  122. // with the mux class.
  123. type channel struct {
  124. // R/O after creation
  125. chanType string
  126. extraData []byte
  127. localId, remoteId uint32
  128. // maxIncomingPayload and maxRemotePayload are the maximum
  129. // payload sizes of normal and extended data packets for
  130. // receiving and sending, respectively. The wire packet will
  131. // be 9 or 13 bytes larger (excluding encryption overhead).
  132. maxIncomingPayload uint32
  133. maxRemotePayload uint32
  134. mux *mux
  135. // decided is set to true if an accept or reject message has been sent
  136. // (for outbound channels) or received (for inbound channels).
  137. decided bool
  138. // direction contains either channelOutbound, for channels created
  139. // locally, or channelInbound, for channels created by the peer.
  140. direction channelDirection
  141. // Pending internal channel messages.
  142. msg chan interface{}
  143. // Since requests have no ID, there can be only one request
  144. // with WantReply=true outstanding. This lock is held by a
  145. // goroutine that has such an outgoing request pending.
  146. sentRequestMu sync.Mutex
  147. incomingRequests chan *Request
  148. sentEOF bool
  149. // thread-safe data
  150. remoteWin window
  151. pending *buffer
  152. extPending *buffer
  153. // windowMu protects myWindow, the flow-control window.
  154. windowMu sync.Mutex
  155. myWindow uint32
  156. // writeMu serializes calls to mux.conn.writePacket() and
  157. // protects sentClose and packetPool. This mutex must be
  158. // different from windowMu, as writePacket can block if there
  159. // is a key exchange pending.
  160. writeMu sync.Mutex
  161. sentClose bool
  162. // packetPool has a buffer for each extended channel ID to
  163. // save allocations during writes.
  164. packetPool map[uint32][]byte
  165. }
  166. // writePacket sends a packet. If the packet is a channel close, it updates
  167. // sentClose. This method takes the lock c.writeMu.
  168. func (c *channel) writePacket(packet []byte) error {
  169. c.writeMu.Lock()
  170. if c.sentClose {
  171. c.writeMu.Unlock()
  172. return io.EOF
  173. }
  174. c.sentClose = (packet[0] == msgChannelClose)
  175. err := c.mux.conn.writePacket(packet)
  176. c.writeMu.Unlock()
  177. return err
  178. }
  179. func (c *channel) sendMessage(msg interface{}) error {
  180. if debugMux {
  181. log.Printf("send %d: %#v", c.mux.chanList.offset, msg)
  182. }
  183. p := Marshal(msg)
  184. binary.BigEndian.PutUint32(p[1:], c.remoteId)
  185. return c.writePacket(p)
  186. }
  187. // WriteExtended writes data to a specific extended stream. These streams are
  188. // used, for example, for stderr.
  189. func (c *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err error) {
  190. if c.sentEOF {
  191. return 0, io.EOF
  192. }
  193. // 1 byte message type, 4 bytes remoteId, 4 bytes data length
  194. opCode := byte(msgChannelData)
  195. headerLength := uint32(9)
  196. if extendedCode > 0 {
  197. headerLength += 4
  198. opCode = msgChannelExtendedData
  199. }
  200. c.writeMu.Lock()
  201. packet := c.packetPool[extendedCode]
  202. // We don't remove the buffer from packetPool, so
  203. // WriteExtended calls from different goroutines will be
  204. // flagged as errors by the race detector.
  205. c.writeMu.Unlock()
  206. for len(data) > 0 {
  207. space := min(c.maxRemotePayload, len(data))
  208. if space, err = c.remoteWin.reserve(space); err != nil {
  209. return n, err
  210. }
  211. if want := headerLength + space; uint32(cap(packet)) < want {
  212. packet = make([]byte, want)
  213. } else {
  214. packet = packet[:want]
  215. }
  216. todo := data[:space]
  217. packet[0] = opCode
  218. binary.BigEndian.PutUint32(packet[1:], c.remoteId)
  219. if extendedCode > 0 {
  220. binary.BigEndian.PutUint32(packet[5:], uint32(extendedCode))
  221. }
  222. binary.BigEndian.PutUint32(packet[headerLength-4:], uint32(len(todo)))
  223. copy(packet[headerLength:], todo)
  224. if err = c.writePacket(packet); err != nil {
  225. return n, err
  226. }
  227. n += len(todo)
  228. data = data[len(todo):]
  229. }
  230. c.writeMu.Lock()
  231. c.packetPool[extendedCode] = packet
  232. c.writeMu.Unlock()
  233. return n, err
  234. }
  235. func (c *channel) handleData(packet []byte) error {
  236. headerLen := 9
  237. isExtendedData := packet[0] == msgChannelExtendedData
  238. if isExtendedData {
  239. headerLen = 13
  240. }
  241. if len(packet) < headerLen {
  242. // malformed data packet
  243. return parseError(packet[0])
  244. }
  245. var extended uint32
  246. if isExtendedData {
  247. extended = binary.BigEndian.Uint32(packet[5:])
  248. }
  249. length := binary.BigEndian.Uint32(packet[headerLen-4 : headerLen])
  250. if length == 0 {
  251. return nil
  252. }
  253. if length > c.maxIncomingPayload {
  254. // TODO(hanwen): should send Disconnect?
  255. return errors.New("ssh: incoming packet exceeds maximum payload size")
  256. }
  257. data := packet[headerLen:]
  258. if length != uint32(len(data)) {
  259. return errors.New("ssh: wrong packet length")
  260. }
  261. c.windowMu.Lock()
  262. if c.myWindow < length {
  263. c.windowMu.Unlock()
  264. // TODO(hanwen): should send Disconnect with reason?
  265. return errors.New("ssh: remote side wrote too much")
  266. }
  267. c.myWindow -= length
  268. c.windowMu.Unlock()
  269. if extended == 1 {
  270. c.extPending.write(data)
  271. } else if extended > 0 {
  272. // discard other extended data.
  273. } else {
  274. c.pending.write(data)
  275. }
  276. return nil
  277. }
  278. func (c *channel) adjustWindow(n uint32) error {
  279. c.windowMu.Lock()
  280. // Since myWindow is managed on our side, and can never exceed
  281. // the initial window setting, we don't worry about overflow.
  282. c.myWindow += uint32(n)
  283. c.windowMu.Unlock()
  284. return c.sendMessage(windowAdjustMsg{
  285. AdditionalBytes: uint32(n),
  286. })
  287. }
  288. func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
  289. switch extended {
  290. case 1:
  291. n, err = c.extPending.Read(data)
  292. case 0:
  293. n, err = c.pending.Read(data)
  294. default:
  295. return 0, fmt.Errorf("ssh: extended code %d unimplemented", extended)
  296. }
  297. if n > 0 {
  298. err = c.adjustWindow(uint32(n))
  299. // sendWindowAdjust can return io.EOF if the remote
  300. // peer has closed the connection, however we want to
  301. // defer forwarding io.EOF to the caller of Read until
  302. // the buffer has been drained.
  303. if n > 0 && err == io.EOF {
  304. err = nil
  305. }
  306. }
  307. return n, err
  308. }
  309. func (c *channel) close() {
  310. c.pending.eof()
  311. c.extPending.eof()
  312. close(c.msg)
  313. close(c.incomingRequests)
  314. c.writeMu.Lock()
  315. // This is not necesary for a normal channel teardown, but if
  316. // there was another error, it is.
  317. c.sentClose = true
  318. c.writeMu.Unlock()
  319. // Unblock writers.
  320. c.remoteWin.close()
  321. }
  322. // responseMessageReceived is called when a success or failure message is
  323. // received on a channel to check that such a message is reasonable for the
  324. // given channel.
  325. func (c *channel) responseMessageReceived() error {
  326. if c.direction == channelInbound {
  327. return errors.New("ssh: channel response message received on inbound channel")
  328. }
  329. if c.decided {
  330. return errors.New("ssh: duplicate response received for channel")
  331. }
  332. c.decided = true
  333. return nil
  334. }
  335. func (c *channel) handlePacket(packet []byte) error {
  336. switch packet[0] {
  337. case msgChannelData, msgChannelExtendedData:
  338. return c.handleData(packet)
  339. case msgChannelClose:
  340. c.sendMessage(channelCloseMsg{PeersId: c.remoteId})
  341. c.mux.chanList.remove(c.localId)
  342. c.close()
  343. return nil
  344. case msgChannelEOF:
  345. // RFC 4254 is mute on how EOF affects dataExt messages but
  346. // it is logical to signal EOF at the same time.
  347. c.extPending.eof()
  348. c.pending.eof()
  349. return nil
  350. }
  351. decoded, err := decode(packet)
  352. if err != nil {
  353. return err
  354. }
  355. switch msg := decoded.(type) {
  356. case *channelOpenFailureMsg:
  357. if err := c.responseMessageReceived(); err != nil {
  358. return err
  359. }
  360. c.mux.chanList.remove(msg.PeersId)
  361. c.msg <- msg
  362. case *channelOpenConfirmMsg:
  363. if err := c.responseMessageReceived(); err != nil {
  364. return err
  365. }
  366. if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 {
  367. return fmt.Errorf("ssh: invalid MaxPacketSize %d from peer", msg.MaxPacketSize)
  368. }
  369. c.remoteId = msg.MyId
  370. c.maxRemotePayload = msg.MaxPacketSize
  371. c.remoteWin.add(msg.MyWindow)
  372. c.msg <- msg
  373. case *windowAdjustMsg:
  374. if !c.remoteWin.add(msg.AdditionalBytes) {
  375. return fmt.Errorf("ssh: invalid window update for %d bytes", msg.AdditionalBytes)
  376. }
  377. case *channelRequestMsg:
  378. req := Request{
  379. Type: msg.Request,
  380. WantReply: msg.WantReply,
  381. Payload: msg.RequestSpecificData,
  382. ch: c,
  383. }
  384. c.incomingRequests <- &req
  385. default:
  386. c.msg <- msg
  387. }
  388. return nil
  389. }
  390. func (m *mux) newChannel(chanType string, direction channelDirection, extraData []byte) *channel {
  391. ch := &channel{
  392. remoteWin: window{Cond: newCond()},
  393. myWindow: channelWindowSize,
  394. pending: newBuffer(),
  395. extPending: newBuffer(),
  396. direction: direction,
  397. incomingRequests: make(chan *Request, 16),
  398. msg: make(chan interface{}, 16),
  399. chanType: chanType,
  400. extraData: extraData,
  401. mux: m,
  402. packetPool: make(map[uint32][]byte),
  403. }
  404. ch.localId = m.chanList.add(ch)
  405. return ch
  406. }
  407. var errUndecided = errors.New("ssh: must Accept or Reject channel")
  408. var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
  409. type extChannel struct {
  410. code uint32
  411. ch *channel
  412. }
  413. func (e *extChannel) Write(data []byte) (n int, err error) {
  414. return e.ch.WriteExtended(data, e.code)
  415. }
  416. func (e *extChannel) Read(data []byte) (n int, err error) {
  417. return e.ch.ReadExtended(data, e.code)
  418. }
  419. func (c *channel) Accept() (Channel, <-chan *Request, error) {
  420. if c.decided {
  421. return nil, nil, errDecidedAlready
  422. }
  423. c.maxIncomingPayload = channelMaxPacket
  424. confirm := channelOpenConfirmMsg{
  425. PeersId: c.remoteId,
  426. MyId: c.localId,
  427. MyWindow: c.myWindow,
  428. MaxPacketSize: c.maxIncomingPayload,
  429. }
  430. c.decided = true
  431. if err := c.sendMessage(confirm); err != nil {
  432. return nil, nil, err
  433. }
  434. return c, c.incomingRequests, nil
  435. }
  436. func (ch *channel) Reject(reason RejectionReason, message string) error {
  437. if ch.decided {
  438. return errDecidedAlready
  439. }
  440. reject := channelOpenFailureMsg{
  441. PeersId: ch.remoteId,
  442. Reason: reason,
  443. Message: message,
  444. Language: "en",
  445. }
  446. ch.decided = true
  447. return ch.sendMessage(reject)
  448. }
  449. func (ch *channel) Read(data []byte) (int, error) {
  450. if !ch.decided {
  451. return 0, errUndecided
  452. }
  453. return ch.ReadExtended(data, 0)
  454. }
  455. func (ch *channel) Write(data []byte) (int, error) {
  456. if !ch.decided {
  457. return 0, errUndecided
  458. }
  459. return ch.WriteExtended(data, 0)
  460. }
  461. func (ch *channel) CloseWrite() error {
  462. if !ch.decided {
  463. return errUndecided
  464. }
  465. ch.sentEOF = true
  466. return ch.sendMessage(channelEOFMsg{
  467. PeersId: ch.remoteId})
  468. }
  469. func (ch *channel) Close() error {
  470. if !ch.decided {
  471. return errUndecided
  472. }
  473. return ch.sendMessage(channelCloseMsg{
  474. PeersId: ch.remoteId})
  475. }
  476. // Extended returns an io.ReadWriter that sends and receives data on the given,
  477. // SSH extended stream. Such streams are used, for example, for stderr.
  478. func (ch *channel) Extended(code uint32) io.ReadWriter {
  479. if !ch.decided {
  480. return nil
  481. }
  482. return &extChannel{code, ch}
  483. }
  484. func (ch *channel) Stderr() io.ReadWriter {
  485. return ch.Extended(1)
  486. }
  487. func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) {
  488. if !ch.decided {
  489. return false, errUndecided
  490. }
  491. if wantReply {
  492. ch.sentRequestMu.Lock()
  493. defer ch.sentRequestMu.Unlock()
  494. }
  495. msg := channelRequestMsg{
  496. PeersId: ch.remoteId,
  497. Request: name,
  498. WantReply: wantReply,
  499. RequestSpecificData: payload,
  500. }
  501. if err := ch.sendMessage(msg); err != nil {
  502. return false, err
  503. }
  504. if wantReply {
  505. m, ok := (<-ch.msg)
  506. if !ok {
  507. return false, io.EOF
  508. }
  509. switch m.(type) {
  510. case *channelRequestFailureMsg:
  511. return false, nil
  512. case *channelRequestSuccessMsg:
  513. return true, nil
  514. default:
  515. return false, fmt.Errorf("ssh: unexpected response to channel request: %#v", m)
  516. }
  517. }
  518. return false, nil
  519. }
  520. // ackRequest either sends an ack or nack to the channel request.
  521. func (ch *channel) ackRequest(ok bool) error {
  522. if !ch.decided {
  523. return errUndecided
  524. }
  525. var msg interface{}
  526. if !ok {
  527. msg = channelRequestFailureMsg{
  528. PeersId: ch.remoteId,
  529. }
  530. } else {
  531. msg = channelRequestSuccessMsg{
  532. PeersId: ch.remoteId,
  533. }
  534. }
  535. return ch.sendMessage(msg)
  536. }
  537. func (ch *channel) ChannelType() string {
  538. return ch.chanType
  539. }
  540. func (ch *channel) ExtraData() []byte {
  541. return ch.extraData
  542. }