godeps: update grpc dependency

This commit is contained in:
Anthony Romano
2016-04-05 22:16:53 -07:00
parent 34375ef851
commit 6b0eb9c3c0
17 changed files with 229 additions and 149 deletions

27
cmd/Godeps/Godeps.json generated
View File

@ -1,8 +1,9 @@
{
"ImportPath": "github.com/coreos/etcd/cmd",
"ImportPath": "github.com/coreos/etcd",
"GoVersion": "go1.6",
"GodepVersion": "v60",
"Packages": [
"./..."
"./cmd/..."
],
"Deps": [
{
@ -104,7 +105,7 @@
},
{
"ImportPath": "github.com/mattn/go-runewidth",
"Comment": "travisish-46-gd6bea18",
"Comment": "v0.0.1",
"Rev": "d6bea18f789704b5f83375793155289da36a3c7f"
},
{
@ -204,35 +205,39 @@
},
{
"ImportPath": "google.golang.org/grpc",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/codes",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/credentials",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/grpclog",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/internal",
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/metadata",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/naming",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/peer",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "google.golang.org/grpc/transport",
"Rev": "b88c12e7caf74af3928de99a864aaa9916fa5aad"
"Rev": "e3d8dfd9076c03272c6e3c7a0ac8671a0e0b374e"
},
{
"ImportPath": "gopkg.in/cheggaaa/pb.v1",

View File

@ -1,5 +1,9 @@
language: go
go:
- 1.5.3
- 1.6
before_install:
- go get github.com/axw/gocov/gocov
- go get github.com/mattn/goveralls

View File

@ -1,16 +1,39 @@
# How to contribute
We definitely welcome patches and contribution to grpc! Here is some guideline
We definitely welcome patches and contribution to grpc! Here are some guidelines
and information about how to do so.
## Getting started
## Sending patches
### Legal requirements
### Getting started
1. Check out the code:
$ go get google.golang.org/grpc
$ cd $GOPATH/src/google.golang.org/grpc
1. Create a fork of the grpc-go repository.
1. Add your fork as a remote:
$ git remote add fork git@github.com:$YOURGITHUBUSERNAME/grpc-go.git
1. Make changes, commit them.
1. Run the test suite:
$ make test
1. Push your changes to your fork:
$ git push fork ...
1. Open a pull request.
## Legal requirements
In order to protect both you and ourselves, you will need to sign the
[Contributor License Agreement](https://cla.developers.google.com/clas).
### Filing Issues
## Filing Issues
When filing an issue, make sure to answer these five questions:
1. What version of Go are you using (`go version`)?

View File

@ -1,15 +1,3 @@
.PHONY: \
all \
deps \
updatedeps \
testdeps \
updatetestdeps \
build \
proto \
test \
testrace \
clean \
all: test testrace
deps:
@ -32,7 +20,7 @@ proto:
echo "error: protoc not installed" >&2; \
exit 1; \
fi
go get -v github.com/golang/protobuf/protoc-gen-go
go get -u -v github.com/golang/protobuf/protoc-gen-go
for file in $$(git ls-files '*.proto'); do \
protoc -I $$(dirname $$file) --go_out=plugins=grpc:$$(dirname $$file) $$file; \
done
@ -44,7 +32,20 @@ testrace: testdeps
go test -v -race -cpu 1,4 google.golang.org/grpc/...
clean:
go clean google.golang.org/grpc/...
go clean -i google.golang.org/grpc/...
coverage: testdeps
./coverage.sh --coveralls
.PHONY: \
all \
deps \
updatedeps \
testdeps \
updatetestdeps \
build \
proto \
test \
testrace \
clean \
coverage

View File

@ -7,7 +7,7 @@ The Go implementation of [gRPC](http://www.grpc.io/): A high performance, open s
Installation
------------
To install this package, you need to install Go 1.4 or above and setup your Go workspace on your computer. The simplest way to install the library is to run:
To install this package, you need to install Go and setup your Go workspace on your computer. The simplest way to install the library is to run:
```
$ go get google.golang.org/grpc
@ -16,7 +16,7 @@ $ go get google.golang.org/grpc
Prerequisites
-------------
This requires Go 1.4 or above.
This requires Go 1.5 or later .
Constraints
-----------

View File

@ -97,8 +97,9 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
return stream, nil
}
// Invoke is called by the generated code. It sends the RPC request on the
// wire and returns after response is received.
// Invoke sends the RPC request on the wire and returns after response is received.
// Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
var c callInfo
for _, o := range opts {
@ -185,6 +186,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if lastErr != nil {
return toRPCErr(lastErr)
}
return Errorf(stream.StatusCode(), stream.StatusDesc())
return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
}
}

View File

@ -52,10 +52,10 @@ var (
// ErrUnspecTarget indicates that the target address is unspecified.
ErrUnspecTarget = errors.New("grpc: target is unspecified")
// ErrNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicityly
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
ErrNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
// ErrCredentialsMisuse indicates that users want to transmit security infomation
// ErrCredentialsMisuse indicates that users want to transmit security information
// (e.g., oauth2 token) which requires secure connection on an insecure
// connection.
ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
@ -288,10 +288,9 @@ func NewConn(cc *ClientConn) (*Conn, error) {
if !c.dopts.insecure {
var ok bool
for _, cd := range c.dopts.copts.AuthOptions {
if _, ok := cd.(credentials.TransportAuthenticator); !ok {
continue
if _, ok = cd.(credentials.TransportAuthenticator); ok {
break
}
ok = true
}
if !ok {
return nil, ErrNoTransportSecurity

49
cmd/vendor/google.golang.org/grpc/internal/internal.go generated vendored Normal file
View File

@ -0,0 +1,49 @@
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
// Package internal contains gRPC-internal code for testing, to avoid polluting
// the godoc of the top-level grpc package.
package internal
// TestingCloseConns closes all existing transports but keeps
// grpcServer.lis accepting new connections.
//
// The provided grpcServer must be of type *grpc.Server. It is untyped
// for circular dependency reasons.
var TestingCloseConns func(grpcServer interface{})
// TestingUseHandlerImpl enables the http.Handler-based server implementation.
// It must be called before Serve and requires TLS credentials.
//
// The provided grpcServer must be of type *grpc.Server. It is untyped
// for circular dependency reasons.
var TestingUseHandlerImpl func(grpcServer interface{})

View File

@ -325,7 +325,7 @@ type rpcError struct {
}
func (e rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %q", e.code, e.desc)
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
}
// Code returns the error code for err if it was produced by the rpc system.
@ -442,3 +442,11 @@ func backoff(retries int) (t time.Duration) {
}
return time.Duration(backoff)
}
// SupportPackageIsVersion1 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
// This constant may be renamed in the future if a change in the generated code
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion1 = true

View File

@ -52,6 +52,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/transport"
)
@ -247,7 +248,6 @@ func (s *Server) Serve(lis net.Listener) error {
delete(s.lis, lis)
s.mu.Unlock()
}()
listenerAddr := lis.Addr()
for {
rawConn, err := lis.Accept()
if err != nil {
@ -258,13 +258,13 @@ func (s *Server) Serve(lis net.Listener) error {
}
// Start a new goroutine to deal with rawConn
// so we don't stall this Accept loop goroutine.
go s.handleRawConn(listenerAddr, rawConn)
go s.handleRawConn(rawConn)
}
}
// handleRawConn is run in its own goroutine and handles a just-accepted
// connection that has not had any I/O performed on it yet.
func (s *Server) handleRawConn(listenerAddr net.Addr, rawConn net.Conn) {
func (s *Server) handleRawConn(rawConn net.Conn) {
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
s.mu.Lock()
@ -284,7 +284,7 @@ func (s *Server) handleRawConn(listenerAddr net.Addr, rawConn net.Conn) {
s.mu.Unlock()
if s.opts.useHandlerImpl {
s.serveUsingHandler(listenerAddr, conn)
s.serveUsingHandler(conn)
} else {
s.serveNewHTTP2Transport(conn, authInfo)
}
@ -340,29 +340,18 @@ var _ http.Handler = (*Server)(nil)
// method as one of the environment types.
//
// conn is the *tls.Conn that's already been authenticated.
func (s *Server) serveUsingHandler(listenerAddr net.Addr, conn net.Conn) {
func (s *Server) serveUsingHandler(conn net.Conn) {
if !s.addConn(conn) {
conn.Close()
return
}
defer s.removeConn(conn)
connDone := make(chan struct{})
hs := &http.Server{
Handler: s,
ConnState: func(c net.Conn, cs http.ConnState) {
if cs == http.StateClosed {
close(connDone)
}
},
}
if err := http2.ConfigureServer(hs, &http2.Server{
h2s := &http2.Server{
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
}); err != nil {
grpclog.Fatalf("grpc: http2.ConfigureServer: %v", err)
return
}
hs.Serve(&singleConnListener{addr: listenerAddr, conn: conn})
<-connDone
h2s.ServeConn(conn, &http2.ServeConnOpts{
Handler: s,
})
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -705,10 +694,18 @@ func (s *Server) Stop() {
s.mu.Unlock()
}
// TestingCloseConns closes all exiting transports but keeps s.lis accepting new
// connections.
// This is only for tests and is subject to removal.
func (s *Server) TestingCloseConns() {
func init() {
internal.TestingCloseConns = func(arg interface{}) {
arg.(*Server).testingCloseConns()
}
internal.TestingUseHandlerImpl = func(arg interface{}) {
arg.(*Server).opts.useHandlerImpl = true
}
}
// testingCloseConns closes all existing transports but keeps s.lis
// accepting new connections.
func (s *Server) testingCloseConns() {
s.mu.Lock()
for c := range s.conns {
c.Close()
@ -717,13 +714,6 @@ func (s *Server) TestingCloseConns() {
s.mu.Unlock()
}
// TestingUseHandlerImpl enables the http.Handler-based server implementation.
// It must be called before Serve and requires TLS credentials.
// This is only for tests and is subject to removal.
func (s *Server) TestingUseHandlerImpl() {
s.opts.useHandlerImpl = true
}
// SendHeader sends header metadata. It may be called at most once from a unary
// RPC handler. The ctx is the RPC handler's Context or one derived from it.
func SendHeader(ctx context.Context, md metadata.MD) error {
@ -754,30 +744,3 @@ func SetTrailer(ctx context.Context, md metadata.MD) error {
}
return stream.SetTrailer(md)
}
// singleConnListener is a net.Listener that yields a single conn.
type singleConnListener struct {
mu sync.Mutex
addr net.Addr
conn net.Conn // nil if done
}
func (ln *singleConnListener) Addr() net.Addr { return ln.addr }
func (ln *singleConnListener) Close() error {
ln.mu.Lock()
defer ln.mu.Unlock()
ln.conn = nil
return nil
}
func (ln *singleConnListener) Accept() (net.Conn, error) {
ln.mu.Lock()
defer ln.mu.Unlock()
c := ln.conn
if c == nil {
return nil, io.EOF
}
ln.conn = nil
return c, nil
}

View File

@ -67,7 +67,8 @@ type Stream interface {
// breaks.
// On error, it aborts the stream and returns an RPC status on client
// side. On server side, it simply returns the error to the caller.
// SendMsg is called by generated code.
// SendMsg is called by generated code. Also Users can call SendMsg
// directly when it is really needed in their use cases.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message or the stream is
// done. On client side, it returns io.EOF when the stream is done. On
@ -257,7 +258,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.finish(err)
return nil
}
return Errorf(cs.s.StatusCode(), cs.s.StatusDesc())
return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
}
return toRPCErr(err)
}
@ -269,7 +270,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
// Returns io.EOF to indicate the end of the stream.
return
}
return Errorf(cs.s.StatusCode(), cs.s.StatusDesc())
return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
}
return toRPCErr(err)
}

View File

@ -56,43 +56,33 @@ type windowUpdate struct {
increment uint32
}
func (windowUpdate) isItem() bool {
return true
}
func (*windowUpdate) item() {}
type settings struct {
ack bool
ss []http2.Setting
}
func (settings) isItem() bool {
return true
}
func (*settings) item() {}
type resetStream struct {
streamID uint32
code http2.ErrCode
}
func (resetStream) isItem() bool {
return true
}
func (*resetStream) item() {}
type flushIO struct {
}
func (flushIO) isItem() bool {
return true
}
func (*flushIO) item() {}
type ping struct {
ack bool
data [8]byte
}
func (ping) isItem() bool {
return true
}
func (*ping) item() {}
// quotaPool is a pool which accumulates the quota and sends it to acquire()
// when it is available.
@ -195,7 +185,7 @@ func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.pendingData+f.pendingUpdate+n > f.limit {
return fmt.Errorf("recieved %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit)
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit)
}
if f.conn != nil {
if err := f.conn.onData(n); err != nil {

View File

@ -118,7 +118,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
// serverHandlerTransport is an implementation of ServerTransport
// which replies to exactly one gRPC request (exactly one HTTP request),
// using the net/http.Handler interface. This http.Handler is guranteed
// using the net/http.Handler interface. This http.Handler is guaranteed
// at this point to be speaking over HTTP/2, so it's able to speak valid
// gRPC.
type serverHandlerTransport struct {
@ -313,16 +313,22 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
readerDone := make(chan struct{})
go func() {
defer close(readerDone)
for {
buf := make([]byte, 1024) // TODO: minimize garbage, optimize recvBuffer code/ownership
// TODO: minimize garbage, optimize recvBuffer code/ownership
const readSize = 8196
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
s.buf.put(&recvMsg{data: buf[:n]})
s.buf.put(&recvMsg{data: buf[:n:n]})
buf = buf[n:]
}
if err != nil {
s.buf.put(&recvMsg{err: mapRecvMsgError(err)})
return
}
if len(buf) == 0 {
buf = make([]byte, readSize)
}
}
}()

View File

@ -330,6 +330,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)})
}
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
k = strings.ToLower(k)
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
}
var (
@ -634,6 +636,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)]
if !ok {
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
s.statusCode = codes.Unknown
}
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
@ -719,6 +722,16 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
s.write(recvMsg{err: io.EOF})
}
func handleMalformedHTTP2(s *Stream, err error) {
s.mu.Lock()
if !s.headerDone {
close(s.headerChan)
s.headerDone = true
}
s.mu.Unlock()
s.write(recvMsg{err: err})
}
// reader runs as a separate goroutine in charge of reading data from network
// connection.
//
@ -743,9 +756,24 @@ func (t *http2Client) reader() {
for {
frame, err := t.framer.readFrame()
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
// is malformed http2.
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
// use error detail to provide better err message
handleMalformedHTTP2(s, StreamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
}
continue
} else {
// Transport error.
t.notifyError(err)
return
}
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
t.operateHeaders(frame)
@ -846,17 +874,6 @@ func (t *http2Client) Error() <-chan struct{} {
func (t *http2Client) notifyError(err error) {
t.mu.Lock()
defer t.mu.Unlock()
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
// is malformed http2.
if se, ok := err.(http2.StreamError); ok {
if s, ok := t.activeStreams[se.StreamID]; ok {
s.write(recvMsg{err: StreamErrorf(http2ErrConvTab[se.Code], "%v", err)})
return
}
}
// make sure t.errorChan is closed only once.
if t.state == reachable {
t.state = unreachable

View File

@ -246,6 +246,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
for {
frame, err := t.framer.readFrame()
if err != nil {
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s)
}
t.controlBuf.put(&resetStream{se.StreamID, se.Code})
continue
}
t.Close()
return
}

View File

@ -69,6 +69,7 @@ var (
http2.ErrCodeInternal: codes.Internal,
http2.ErrCodeFlowControl: codes.ResourceExhausted,
http2.ErrCodeSettingsTimeout: codes.Internal,
http2.ErrCodeStreamClosed: codes.Internal,
http2.ErrCodeFrameSize: codes.Internal,
http2.ErrCodeRefusedStream: codes.Unavailable,
http2.ErrCodeCancel: codes.Canceled,
@ -272,7 +273,7 @@ type framer struct {
func newFramer(conn net.Conn) *framer {
f := &framer{
reader: conn,
reader: bufio.NewReaderSize(conn, http2IOBufSize),
writer: bufio.NewWriterSize(conn, http2IOBufSize),
}
f.fr = http2.NewFramer(f.writer, f.reader)
@ -404,3 +405,7 @@ func (f *framer) flushWrite() error {
func (f *framer) readFrame() (http2.Frame, error) {
return f.fr.ReadFrame()
}
func (f *framer) errorDetail() error {
return f.fr.ErrorDetail()
}

View File

@ -63,13 +63,11 @@ type recvMsg struct {
err error
}
func (recvMsg) isItem() bool {
return true
}
func (*recvMsg) item() {}
// All items in an out of a recvBuffer should be the same type.
type item interface {
isItem() bool
item()
}
// recvBuffer is an unbounded channel of item.
@ -89,13 +87,15 @@ func newRecvBuffer() *recvBuffer {
func (b *recvBuffer) put(r item) {
b.mu.Lock()
defer b.mu.Unlock()
b.backlog = append(b.backlog, r)
if len(b.backlog) == 0 {
select {
case b.c <- b.backlog[0]:
b.backlog = b.backlog[1:]
case b.c <- r:
return
default:
}
}
b.backlog = append(b.backlog, r)
}
func (b *recvBuffer) load() {
b.mu.Lock()
@ -299,20 +299,18 @@ func (s *Stream) Read(p []byte) (n int, err error) {
return
}
type key int
// The key to save transport.Stream in the context.
const streamKey = key(0)
type streamKey struct{}
// newContextWithStream creates a new context from ctx and attaches stream
// to it.
func newContextWithStream(ctx context.Context, stream *Stream) context.Context {
return context.WithValue(ctx, streamKey, stream)
return context.WithValue(ctx, streamKey{}, stream)
}
// StreamFromContext returns the stream saved in ctx.
func StreamFromContext(ctx context.Context) (s *Stream, ok bool) {
s, ok = ctx.Value(streamKey).(*Stream)
s, ok = ctx.Value(streamKey{}).(*Stream)
return
}