123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- // Copyright 2013 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package ssh
- import (
- "encoding/binary"
- "fmt"
- "io"
- "log"
- "sync"
- "sync/atomic"
- )
- // debugMux, if set, causes messages in the connection protocol to be
- // logged.
- const debugMux = false
- // chanList is a thread safe channel list.
- type chanList struct {
- // protects concurrent access to chans
- sync.Mutex
- // chans are indexed by the local id of the channel, which the
- // other side should send in the PeersId field.
- chans []*channel
- // This is a debugging aid: it offsets all IDs by this
- // amount. This helps distinguish otherwise identical
- // server/client muxes
- offset uint32
- }
- // Assigns a channel ID to the given channel.
- func (c *chanList) add(ch *channel) uint32 {
- c.Lock()
- defer c.Unlock()
- for i := range c.chans {
- if c.chans[i] == nil {
- c.chans[i] = ch
- return uint32(i) + c.offset
- }
- }
- c.chans = append(c.chans, ch)
- return uint32(len(c.chans)-1) + c.offset
- }
- // getChan returns the channel for the given ID.
- func (c *chanList) getChan(id uint32) *channel {
- id -= c.offset
- c.Lock()
- defer c.Unlock()
- if id < uint32(len(c.chans)) {
- return c.chans[id]
- }
- return nil
- }
- func (c *chanList) remove(id uint32) {
- id -= c.offset
- c.Lock()
- if id < uint32(len(c.chans)) {
- c.chans[id] = nil
- }
- c.Unlock()
- }
- // dropAll forgets all channels it knows, returning them in a slice.
- func (c *chanList) dropAll() []*channel {
- c.Lock()
- defer c.Unlock()
- var r []*channel
- for _, ch := range c.chans {
- if ch == nil {
- continue
- }
- r = append(r, ch)
- }
- c.chans = nil
- return r
- }
- // mux represents the state for the SSH connection protocol, which
- // multiplexes many channels onto a single packet transport.
- type mux struct {
- conn packetConn
- chanList chanList
- incomingChannels chan NewChannel
- globalSentMu sync.Mutex
- globalResponses chan interface{}
- incomingRequests chan *Request
- errCond *sync.Cond
- err error
- }
- // When debugging, each new chanList instantiation has a different
- // offset.
- var globalOff uint32
- func (m *mux) Wait() error {
- m.errCond.L.Lock()
- defer m.errCond.L.Unlock()
- for m.err == nil {
- m.errCond.Wait()
- }
- return m.err
- }
- // newMux returns a mux that runs over the given connection.
- func newMux(p packetConn) *mux {
- m := &mux{
- conn: p,
- incomingChannels: make(chan NewChannel, 16),
- globalResponses: make(chan interface{}, 1),
- incomingRequests: make(chan *Request, 16),
- errCond: newCond(),
- }
- if debugMux {
- m.chanList.offset = atomic.AddUint32(&globalOff, 1)
- }
- go m.loop()
- return m
- }
- func (m *mux) sendMessage(msg interface{}) error {
- p := Marshal(msg)
- return m.conn.writePacket(p)
- }
- func (m *mux) SendRequest(name string, wantReply bool, payload []byte) (bool, []byte, error) {
- if wantReply {
- m.globalSentMu.Lock()
- defer m.globalSentMu.Unlock()
- }
- if err := m.sendMessage(globalRequestMsg{
- Type: name,
- WantReply: wantReply,
- Data: payload,
- }); err != nil {
- return false, nil, err
- }
- if !wantReply {
- return false, nil, nil
- }
- msg, ok := <-m.globalResponses
- if !ok {
- return false, nil, io.EOF
- }
- switch msg := msg.(type) {
- case *globalRequestFailureMsg:
- return false, msg.Data, nil
- case *globalRequestSuccessMsg:
- return true, msg.Data, nil
- default:
- return false, nil, fmt.Errorf("ssh: unexpected response to request: %#v", msg)
- }
- }
- // ackRequest must be called after processing a global request that
- // has WantReply set.
- func (m *mux) ackRequest(ok bool, data []byte) error {
- if ok {
- return m.sendMessage(globalRequestSuccessMsg{Data: data})
- }
- return m.sendMessage(globalRequestFailureMsg{Data: data})
- }
- // TODO(hanwen): Disconnect is a transport layer message. We should
- // probably send and receive Disconnect somewhere in the transport
- // code.
- // Disconnect sends a disconnect message.
- func (m *mux) Disconnect(reason uint32, message string) error {
- return m.sendMessage(disconnectMsg{
- Reason: reason,
- Message: message,
- })
- }
- func (m *mux) Close() error {
- return m.conn.Close()
- }
- // loop runs the connection machine. It will process packets until an
- // error is encountered. To synchronize on loop exit, use mux.Wait.
- func (m *mux) loop() {
- var err error
- for err == nil {
- err = m.onePacket()
- }
- for _, ch := range m.chanList.dropAll() {
- ch.close()
- }
- close(m.incomingChannels)
- close(m.incomingRequests)
- close(m.globalResponses)
- m.conn.Close()
- m.errCond.L.Lock()
- m.err = err
- m.errCond.Broadcast()
- m.errCond.L.Unlock()
- if debugMux {
- log.Println("loop exit", err)
- }
- }
- // onePacket reads and processes one packet.
- func (m *mux) onePacket() error {
- packet, err := m.conn.readPacket()
- if err != nil {
- return err
- }
- if debugMux {
- if packet[0] == msgChannelData || packet[0] == msgChannelExtendedData {
- log.Printf("decoding(%d): data packet - %d bytes", m.chanList.offset, len(packet))
- } else {
- p, _ := decode(packet)
- log.Printf("decoding(%d): %d %#v - %d bytes", m.chanList.offset, packet[0], p, len(packet))
- }
- }
- switch packet[0] {
- case msgNewKeys:
- // Ignore notification of key change.
- return nil
- case msgDisconnect:
- return m.handleDisconnect(packet)
- case msgChannelOpen:
- return m.handleChannelOpen(packet)
- case msgGlobalRequest, msgRequestSuccess, msgRequestFailure:
- return m.handleGlobalPacket(packet)
- }
- // assume a channel packet.
- if len(packet) < 5 {
- return parseError(packet[0])
- }
- id := binary.BigEndian.Uint32(packet[1:])
- ch := m.chanList.getChan(id)
- if ch == nil {
- return fmt.Errorf("ssh: invalid channel %d", id)
- }
- return ch.handlePacket(packet)
- }
- func (m *mux) handleDisconnect(packet []byte) error {
- var d disconnectMsg
- if err := Unmarshal(packet, &d); err != nil {
- return err
- }
- if debugMux {
- log.Printf("caught disconnect: %v", d)
- }
- return &d
- }
- func (m *mux) handleGlobalPacket(packet []byte) error {
- msg, err := decode(packet)
- if err != nil {
- return err
- }
- switch msg := msg.(type) {
- case *globalRequestMsg:
- m.incomingRequests <- &Request{
- Type: msg.Type,
- WantReply: msg.WantReply,
- Payload: msg.Data,
- mux: m,
- }
- case *globalRequestSuccessMsg, *globalRequestFailureMsg:
- m.globalResponses <- msg
- default:
- panic(fmt.Sprintf("not a global message %#v", msg))
- }
- return nil
- }
- // handleChannelOpen schedules a channel to be Accept()ed.
- func (m *mux) handleChannelOpen(packet []byte) error {
- var msg channelOpenMsg
- if err := Unmarshal(packet, &msg); err != nil {
- return err
- }
- if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 {
- failMsg := channelOpenFailureMsg{
- PeersId: msg.PeersId,
- Reason: ConnectionFailed,
- Message: "invalid request",
- Language: "en_US.UTF-8",
- }
- return m.sendMessage(failMsg)
- }
- c := m.newChannel(msg.ChanType, channelInbound, msg.TypeSpecificData)
- c.remoteId = msg.PeersId
- c.maxRemotePayload = msg.MaxPacketSize
- c.remoteWin.add(msg.PeersWindow)
- m.incomingChannels <- c
- return nil
- }
- func (m *mux) OpenChannel(chanType string, extra []byte) (Channel, <-chan *Request, error) {
- ch, err := m.openChannel(chanType, extra)
- if err != nil {
- return nil, nil, err
- }
- return ch, ch.incomingRequests, nil
- }
- func (m *mux) openChannel(chanType string, extra []byte) (*channel, error) {
- ch := m.newChannel(chanType, channelOutbound, extra)
- ch.maxIncomingPayload = channelMaxPacket
- open := channelOpenMsg{
- ChanType: chanType,
- PeersWindow: ch.myWindow,
- MaxPacketSize: ch.maxIncomingPayload,
- TypeSpecificData: extra,
- PeersId: ch.localId,
- }
- if err := m.sendMessage(open); err != nil {
- return nil, err
- }
- switch msg := (<-ch.msg).(type) {
- case *channelOpenConfirmMsg:
- return ch, nil
- case *channelOpenFailureMsg:
- return nil, &OpenChannelError{msg.Reason, msg.Message}
- default:
- return nil, fmt.Errorf("ssh: unexpected packet in response to channel open: %T", msg)
- }
- }
|