decode.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Copyright 2013 Matt T. Proud
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pbutil
  15. import (
  16. "encoding/binary"
  17. "errors"
  18. "io"
  19. "github.com/golang/protobuf/proto"
  20. )
  21. var errInvalidVarint = errors.New("invalid varint32 encountered")
  22. // ReadDelimited decodes a message from the provided length-delimited stream,
  23. // where the length is encoded as 32-bit varint prefix to the message body.
  24. // It returns the total number of bytes read and any applicable error. This is
  25. // roughly equivalent to the companion Java API's
  26. // MessageLite#parseDelimitedFrom. As per the reader contract, this function
  27. // calls r.Read repeatedly as required until exactly one message including its
  28. // prefix is read and decoded (or an error has occurred). The function never
  29. // reads more bytes from the stream than required. The function never returns
  30. // an error if a message has been read and decoded correctly, even if the end
  31. // of the stream has been reached in doing so. In that case, any subsequent
  32. // calls return (0, io.EOF).
  33. func ReadDelimited(r io.Reader, m proto.Message) (n int, err error) {
  34. // Per AbstractParser#parsePartialDelimitedFrom with
  35. // CodedInputStream#readRawVarint32.
  36. var headerBuf [binary.MaxVarintLen32]byte
  37. var bytesRead, varIntBytes int
  38. var messageLength uint64
  39. for varIntBytes == 0 { // i.e. no varint has been decoded yet.
  40. if bytesRead >= len(headerBuf) {
  41. return bytesRead, errInvalidVarint
  42. }
  43. // We have to read byte by byte here to avoid reading more bytes
  44. // than required. Each read byte is appended to what we have
  45. // read before.
  46. newBytesRead, err := r.Read(headerBuf[bytesRead : bytesRead+1])
  47. if newBytesRead == 0 {
  48. if err != nil {
  49. return bytesRead, err
  50. }
  51. // A Reader should not return (0, nil), but if it does,
  52. // it should be treated as no-op (according to the
  53. // Reader contract). So let's go on...
  54. continue
  55. }
  56. bytesRead += newBytesRead
  57. // Now present everything read so far to the varint decoder and
  58. // see if a varint can be decoded already.
  59. messageLength, varIntBytes = proto.DecodeVarint(headerBuf[:bytesRead])
  60. }
  61. messageBuf := make([]byte, messageLength)
  62. newBytesRead, err := io.ReadFull(r, messageBuf)
  63. bytesRead += newBytesRead
  64. if err != nil {
  65. return bytesRead, err
  66. }
  67. return bytesRead, proto.Unmarshal(messageBuf, m)
  68. }