add option to support text format (#54)

* augments grpcurl package API in order to handle multiple formats
* deprecates old signature for InvokeRpc
* add command-line flag to use protobuf text format instead of JSON
* use AnyResolver when marshaling to/from JSON
This commit is contained in:
Joshua Humphries
2018-10-16 21:26:16 -04:00
committed by GitHub
parent 397a8c18ca
commit e00ef3eb7c
9 changed files with 752 additions and 104 deletions

View File

@@ -48,7 +48,7 @@ var ErrReflectionNotSupported = errors.New("server does not support the reflecti
// proto (like a file generated by protoc) or a remote server that supports the reflection API.
type DescriptorSource interface {
// ListServices returns a list of fully-qualified service names. It will be all services in a set of
// descriptor files or the set of all services exposed by a GRPC server.
// descriptor files or the set of all services exposed by a gRPC server.
ListServices() ([]string, error)
// FindSymbol returns a descriptor for the given fully-qualified symbol name.
FindSymbol(fullyQualifiedName string) (desc.Descriptor, error)
@@ -181,6 +181,20 @@ func (fs *fileSource) ListServices() ([]string, error) {
return sl, nil
}
// GetAllFiles returns all of the underlying file descriptors. This is
// more thorough and more efficient than the fallback strategy used by
// the GetAllFiles package method, for enumerating all files from a
// descriptor source.
func (fs *fileSource) GetAllFiles() ([]*desc.FileDescriptor, error) {
files := make([]*desc.FileDescriptor, len(fs.files))
i := 0
for _, fd := range fs.files {
files[i] = fd
i++
}
return files, nil
}
func (fs *fileSource) FindSymbol(fullyQualifiedName string) (desc.Descriptor, error) {
for _, fd := range fs.files {
if dsc := fd.FindSymbol(fullyQualifiedName); dsc != nil {
@@ -200,7 +214,7 @@ func (fs *fileSource) AllExtensionsForType(typeName string) ([]*desc.FieldDescri
return fs.er.AllExtensionsForType(typeName), nil
}
// DescriptorSourceFromServer creates a DescriptorSource that uses the given GRPC reflection client
// DescriptorSourceFromServer creates a DescriptorSource that uses the given gRPC reflection client
// to interrogate a server for descriptor information. If the server does not support the reflection
// API then the various DescriptorSource methods will return ErrReflectionNotSupported
func DescriptorSourceFromServer(ctx context.Context, refClient *grpcreflect.Client) DescriptorSource {
@@ -265,6 +279,75 @@ func ListServices(source DescriptorSource) ([]string, error) {
return svcs, nil
}
type sourceWithFiles interface {
GetAllFiles() ([]*desc.FileDescriptor, error)
}
var _ sourceWithFiles = (*fileSource)(nil)
// GetAllFiles uses the given descriptor source to return a list of file descriptors.
func GetAllFiles(source DescriptorSource) ([]*desc.FileDescriptor, error) {
var files []*desc.FileDescriptor
srcFiles, ok := source.(sourceWithFiles)
if ok {
var err error
files, err = srcFiles.GetAllFiles()
if err != nil {
return nil, err
}
} else {
// Source does not implement GetAllFiles method, so use ListServices
// and grab files from there.
allFiles := map[string]*desc.FileDescriptor{}
svcNames, err := source.ListServices()
if err != nil {
return nil, err
}
for _, name := range svcNames {
d, err := source.FindSymbol(name)
if err != nil {
return nil, err
}
addAllFilesToSet(d.GetFile(), allFiles)
}
files = make([]*desc.FileDescriptor, len(allFiles))
i := 0
for _, fd := range allFiles {
files[i] = fd
i++
}
}
sort.Sort(filesByName(files))
return files, nil
}
type filesByName []*desc.FileDescriptor
func (f filesByName) Len() int {
return len(f)
}
func (f filesByName) Less(i, j int) bool {
return f[i].GetName() < f[j].GetName()
}
func (f filesByName) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func addAllFilesToSet(fd *desc.FileDescriptor, all map[string]*desc.FileDescriptor) {
if _, ok := all[fd.GetName()]; ok {
// already added
return
}
all[fd.GetName()] = fd
for _, dep := range fd.GetDependencies() {
addAllFilesToSet(dep, all)
}
}
// ListMethods uses the given descriptor source to return a sorted list of method names
// for the specified fully-qualified service name.
func ListMethods(source DescriptorSource, serviceName string) ([]string, error) {
@@ -319,27 +402,54 @@ type InvocationEventHandler interface {
}
// RequestMessageSupplier is a function that is called to retrieve request
// messages for a GRPC operation. The message contents must be valid JSON. If
// the supplier has no more messages, it should return nil, io.EOF.
// messages for a GRPC operation. This type is deprecated and will be removed in
// a future release.
//
// Deprecated: This is only used with the deprecated InvokeRpc. Instead, use
// RequestSupplier with InvokeRPC.
type RequestMessageSupplier func() ([]byte, error)
// InvokeRpc uses the given GRPC connection to invoke the given method. The given descriptor source
// InvokeRpc uses the given gRPC connection to invoke the given method. This function is deprecated
// and will be removed in a future release. It just delegates to the similarly named InvokeRPC
// method, whose signature is only slightly different.
//
// Deprecated: use InvokeRPC instead.
func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error {
return InvokeRPC(ctx, source, cc, methodName, headers, handler, func(m proto.Message) error {
// New function is almost identical, but the request supplier function works differently.
// So we adapt the logic here to maintain compatibility.
data, err := requestData()
if err != nil {
return err
}
return jsonpb.Unmarshal(bytes.NewReader(data), m)
})
}
// RequestSupplier is a function that is called to populate messages for a gRPC operation. The
// function should populate the given message or return a non-nil error. If the supplier has no
// more messages, it should return io.EOF. When it returns io.EOF, it should not in any way
// modify the given message argument.
type RequestSupplier func(proto.Message) error
// InvokeRPC uses the given gRPC channel to invoke the given method. The given descriptor source
// is used to determine the type of method and the type of request and response message. The given
// headers are sent as request metadata. Methods on the given event handler are called as the
// invocation proceeds.
//
// The given requestData function supplies the actual data to send. It should return io.EOF when
// there is no more request data. If it returns a nil error then the returned JSON message should
// not be blank. If the method being invoked is a unary or server-streaming RPC (e.g. exactly one
// request message) and there is no request data (e.g. the first invocation of the function returns
// io.EOF), then a blank request message is sent, as if the request data were an empty object: "{}".
// there is no more request data. If the method being invoked is a unary or server-streaming RPC
// (e.g. exactly one request message) and there is no request data (e.g. the first invocation of
// the function returns io.EOF), then an empty request message is sent.
//
// If the requestData function and the given event handler coordinate or share any state, they should
// be thread-safe. This is because the requestData function may be called from a different goroutine
// than the one invoking event callbacks. (This only happens for bi-directional streaming RPCs, where
// one goroutine sends request messages and another consumes the response messages).
func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error {
func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Channel, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestSupplier) error {
md := MetadataFromHeaders(headers)
@@ -381,7 +491,7 @@ func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn
handler.OnSendHeaders(md)
ctx = metadata.NewOutgoingContext(ctx, md)
stub := grpcdynamic.NewStubWithMessageFactory(cc, msgFactory)
stub := grpcdynamic.NewStubWithMessageFactory(ch, msgFactory)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -397,21 +507,15 @@ func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn
}
func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
data, err := requestData()
err := requestData(req)
if err != nil && err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
return fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
}
}
if err != io.EOF {
// verify there is no second message, which is a usage error
_, err := requestData()
err := requestData(req)
if err == nil {
return fmt.Errorf("method %q is a unary RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())
} else if err != io.EOF {
@@ -443,7 +547,7 @@ func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDesc
}
func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
// invoke the RPC!
str, err := stub.InvokeRpcClientStream(ctx, md)
@@ -451,8 +555,7 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
// Upload each request message in the stream
var resp proto.Message
for err == nil {
var data []byte
data, err = requestData()
err = requestData(req)
if err == io.EOF {
resp, err = str.CloseAndReceive()
break
@@ -460,12 +563,6 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
if err != nil {
return fmt.Errorf("error getting request data: %v", err)
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
return fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
}
}
err = str.SendMsg(req)
if err == io.EOF {
@@ -500,21 +597,15 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
}
func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
data, err := requestData()
err := requestData(req)
if err != nil && err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
return fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
}
}
if err != io.EOF {
// verify there is no second message, which is a usage error
_, err := requestData()
err := requestData(req)
if err == nil {
return fmt.Errorf("method %q is a server-streaming RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())
} else if err != io.EOF {
@@ -555,7 +646,7 @@ func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
}
func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
// invoke the RPC!
str, err := stub.InvokeRpcBidiStream(ctx, md)
@@ -572,9 +663,8 @@ func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescr
// Concurrently upload each request message in the stream
var err error
var data []byte
for err == nil {
data, err = requestData()
err = requestData(req)
if err == io.EOF {
err = str.CloseSend()
@@ -584,13 +674,6 @@ func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescr
err = fmt.Errorf("error getting request data: %v", err)
break
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
err = fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
break
}
}
err = str.SendMsg(req)
@@ -700,16 +783,29 @@ func MetadataToString(md metadata.MD) string {
if len(md) == 0 {
return "(empty)"
}
keys := make([]string, 0, len(md))
for k := range md {
keys = append(keys, k)
}
sort.Strings(keys)
var b bytes.Buffer
for k, vs := range md {
first := true
for _, k := range keys {
vs := md[k]
for _, v := range vs {
if first {
first = false
} else {
b.WriteString("\n")
}
b.WriteString(k)
b.WriteString(": ")
if strings.HasSuffix(k, "-bin") {
v = base64.StdEncoding.EncodeToString([]byte(v))
}
b.WriteString(v)
b.WriteString("\n")
}
}
return b.String()
@@ -840,7 +936,7 @@ func fullyConvertToDynamic(msgFact *dynamic.MessageFactory, msg proto.Message) (
return dm, nil
}
// ClientTransportCredentials builds transport credentials for a GRPC client using the
// ClientTransportCredentials builds transport credentials for a gRPC client using the
// given properties. If cacertFile is blank, only standard trusted certs are used to
// verify the server certs. If clientCertFile is blank, the client will not use a client
// certificate. If clientCertFile is not blank then clientKeyFile must not be blank.
@@ -877,7 +973,7 @@ func ClientTransportCredentials(insecureSkipVerify bool, cacertFile, clientCertF
return credentials.NewTLS(&tlsConf), nil
}
// ServerTransportCredentials builds transport credentials for a GRPC server using the
// ServerTransportCredentials builds transport credentials for a gRPC server using the
// given properties. If cacertFile is blank, the server will not request client certs
// unless requireClientCerts is true. When requireClientCerts is false and cacertFile is
// not blank, the server will verify client certs when presented, but will not require