add option to support text format; use AnyResolver when marshaling to/from JSON

This commit is contained in:
Josh Humphries 2018-10-15 18:51:42 -04:00
parent 554e69be2c
commit d76351d11b
2 changed files with 286 additions and 85 deletions

View File

@ -1,12 +1,15 @@
// Command grpcurl makes GRPC requests (a la cURL, but HTTP/2). It can use a supplied descriptor file or
// service reflection to translate JSON request data into the appropriate protobuf request data and vice
// versa for presenting the response contents.
// Command grpcurl makes GRPC requests (a la cURL, but HTTP/2). It can use a supplied descriptor
// file, protobuf sources, or service reflection to translate JSON or text request data into the
// appropriate protobuf messages and vice versa for presenting the response contents.
package main
import (
"bufio"
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
@ -64,10 +67,19 @@ var (
authority = flag.String("authority", "",
":authority pseudo header value to be passed along with underlying HTTP/2 requests. It defaults to `host [ \":\" port ]` part of the target url.")
data = flag.String("d", "",
`JSON request contents. If the value is '@' then the request contents are
read from stdin. For calls that accept a stream of requests, the
`Data for request contents. If the value is '@' then the request contents
are read from stdin. For calls that accept a stream of requests, the
contents should include all such request messages concatenated together
(optionally separated by whitespace).`)
format = flag.String("format", "",
`The format of request data. The allowed values are 'json' (the default)
or 'text'. For 'json', the input data must be in JSON format. Multiple
request values may be concatenated (messages with a JSON representation
other than Object must be separated by whitespace, such as a newline). For
'text', the input data must be in the protobuf text format, in which case
multiple request values must be separated by the "record separate" ASCII
character: 0x1E. The stream should not end in a record separator. If it
does, it will be interpreted as a final, blank message after the separator.`)
connectTimeout = flag.String("connect-timeout", "",
`The maximum time, in seconds, to wait for connection to be established.
Defaults to 10 seconds.`)
@ -81,9 +93,9 @@ var (
preventing batch jobs that use grpcurl from hanging due to slow or bad
network links or due to incorrect stream method usage.`)
emitDefaults = flag.Bool("emit-defaults", false,
`Emit default values from JSON-encoded responses.`)
`Emit default values for JSON-encoded responses.`)
msgTemplate = flag.Bool("msg-template", false,
`When describing messages, show a JSON template for the message type.`)
`When describing messages, show a template of input data.`)
verbose = flag.Bool("v", false,
`Enable verbose output.`)
serverName = flag.String("servername", "", "Override servername when validating TLS certificate.")
@ -168,6 +180,9 @@ func main() {
if (*key == "") != (*cert == "") {
fail(nil, "The -cert and -key arguments must be used together and both be present.")
}
if *format != "" && *format != "json" && *format != "text" {
fail(nil, "The -format option must be 'json' or 'text.")
}
args := flag.Args()
@ -417,10 +432,17 @@ func main() {
// create a request to invoke an RPC
tmpl := makeTemplate(dynamic.NewMessage(dsc))
fmt.Println("\nMessage template:")
jsm := jsonpb.Marshaler{Indent: " ", EmitDefaults: true}
err := jsm.Marshal(os.Stdout, tmpl)
if err != nil {
fail(err, "Failed to print template for message %s", s)
if *format == "" || *format == "json" {
jsm := jsonpb.Marshaler{Indent: " ", EmitDefaults: true}
err := jsm.Marshal(os.Stdout, tmpl)
if err != nil {
fail(err, "Failed to print template for message %s", s)
}
} else /* *format == "text" */ {
err := proto.MarshalText(os.Stdout, tmpl)
if err != nil {
fail(err, "Failed to print template for message %s", s)
}
}
fmt.Println()
}
@ -431,28 +453,48 @@ func main() {
if cc == nil {
cc = dial()
}
var dec *json.Decoder
var in io.Reader
if *data == "@" {
dec = json.NewDecoder(os.Stdin)
in = os.Stdin
} else {
dec = json.NewDecoder(strings.NewReader(*data))
in = strings.NewReader(*data)
}
h := &handler{dec: dec, descSource: descSource}
err := grpcurl.InvokeRpc(ctx, descSource, cc, symbol, append(addlHeaders, rpcHeaders...), h, h.getRequestData)
var rf requestFactory
var h handler
if *format == "" || *format == "json" {
resolver, err := anyResolver(descSource)
if err != nil {
fail(err, "Error creating message resolver")
}
rf = newJsonFactory(in, resolver)
h = handler{
descSource: descSource,
marshaler: jsonpb.Marshaler{
EmitDefaults: *emitDefaults,
AnyResolver: resolver,
},
}
} else {
rf = newTextFactory(in)
h = handler{descSource: descSource}
}
err := grpcurl.InvokeRPC(ctx, descSource, cc, symbol, append(addlHeaders, rpcHeaders...), &h, rf.next)
if err != nil {
fail(err, "Error invoking method %q", symbol)
}
reqSuffix := ""
respSuffix := ""
if h.reqCount != 1 {
reqCount := rf.numRequests()
if reqCount != 1 {
reqSuffix = "s"
}
if h.respCount != 1 {
respSuffix = "s"
}
if *verbose {
fmt.Printf("Sent %d request%s and received %d response%s\n", h.reqCount, reqSuffix, h.respCount, respSuffix)
fmt.Printf("Sent %d request%s and received %d response%s\n", reqCount, reqSuffix, h.respCount, respSuffix)
}
if h.stat.Code() != codes.OK {
fmt.Fprintf(os.Stderr, "ERROR:\n Code: %s\n Message: %s\n", h.stat.Code().String(), h.stat.Message())
@ -512,12 +554,25 @@ func fail(err error, msg string, args ...interface{}) {
}
}
func anyResolver(source grpcurl.DescriptorSource) (jsonpb.AnyResolver, error) {
files, err := grpcurl.GetAllFiles(source)
if err != nil {
return nil, err
}
var er dynamic.ExtensionRegistry
for _, fd := range files {
er.AddExtensionsFromFile(fd)
}
mf := dynamic.NewMessageFactoryWithExtensionRegistry(&er)
return dynamic.AnyResolver(mf, files...), nil
}
type handler struct {
dec *json.Decoder
descSource grpcurl.DescriptorSource
reqCount int
respCount int
stat *status.Status
marshaler jsonpb.Marshaler
}
func (h *handler) OnResolveMethod(md *desc.MethodDescriptor) {
@ -535,31 +590,31 @@ func (*handler) OnSendHeaders(md metadata.MD) {
}
}
func (h *handler) getRequestData() ([]byte, error) {
// we don't use a mutex, though this methods will be called from different goroutine
// than other methods for bidi calls, because this method does not share any state
// with the other methods.
var msg json.RawMessage
if err := h.dec.Decode(&msg); err != nil {
return nil, err
}
h.reqCount++
return msg, nil
}
func (*handler) OnReceiveHeaders(md metadata.MD) {
if *verbose {
fmt.Printf("\nResponse headers received:\n%s\n", grpcurl.MetadataToString(md))
}
}
const rs = string(0x1e)
func (h *handler) OnReceiveResponse(resp proto.Message) {
h.respCount++
if *verbose {
fmt.Print("\nResponse contents:\n")
}
jsm := jsonpb.Marshaler{EmitDefaults: *emitDefaults, Indent: " "}
respStr, err := jsm.MarshalToString(resp)
var respStr string
var err error
if *format == "" || *format == "json" {
respStr, err = h.marshaler.MarshalToString(resp)
} else /* *format == "text" */ {
respStr = proto.MarshalTextString(resp)
if !*verbose {
// if not verbose output, then also include record delimiters,
// so output could potentially piped to another grpcurl process
respStr = respStr + rs
}
}
if err != nil {
fail(err, "failed to generate JSON form of response message")
}
@ -633,3 +688,66 @@ func makeTemplate(msg proto.Message) proto.Message {
}
return dm
}
type requestFactory interface {
next(proto.Message) error
numRequests() int
}
type jsonFactory struct {
dec *json.Decoder
unmarshaler jsonpb.Unmarshaler
requestCount int
}
func newJsonFactory(in io.Reader, resolver jsonpb.AnyResolver) *jsonFactory {
return &jsonFactory{
dec: json.NewDecoder(in),
unmarshaler: jsonpb.Unmarshaler{AnyResolver: resolver},
}
}
func (f *jsonFactory) next(m proto.Message) error {
var msg json.RawMessage
if err := f.dec.Decode(&msg); err != nil {
return err
}
f.requestCount++
return f.unmarshaler.Unmarshal(bytes.NewReader(msg), m)
}
func (f *jsonFactory) numRequests() int {
return f.requestCount
}
type textFactory struct {
r *bufio.Reader
err error
requestCount int
}
func newTextFactory(in io.Reader) *textFactory {
return &textFactory{r: bufio.NewReader(in)}
}
func (f *textFactory) next(m proto.Message) error {
if f.err != nil {
return f.err
}
var b []byte
b, f.err = f.r.ReadBytes(0x1e)
if f.err != nil && f.err != io.EOF {
return f.err
}
// remove delimiter
if b[len(b)-1] == 0x1e {
b = b[:len(b)-1]
}
return proto.UnmarshalText(string(b), m)
}
func (f *textFactory) numRequests() int {
return f.requestCount
}

View File

@ -48,7 +48,7 @@ var ErrReflectionNotSupported = errors.New("server does not support the reflecti
// proto (like a file generated by protoc) or a remote server that supports the reflection API.
type DescriptorSource interface {
// ListServices returns a list of fully-qualified service names. It will be all services in a set of
// descriptor files or the set of all services exposed by a GRPC server.
// descriptor files or the set of all services exposed by a gRPC server.
ListServices() ([]string, error)
// FindSymbol returns a descriptor for the given fully-qualified symbol name.
FindSymbol(fullyQualifiedName string) (desc.Descriptor, error)
@ -181,6 +181,20 @@ func (fs *fileSource) ListServices() ([]string, error) {
return sl, nil
}
// GetAllFiles returns all of the underlying file descriptors. This is
// more thorough and more efficient than the fallback strategy used by
// the GetAllFiles package method, for enumerating all files from a
// descriptor source.
func (fs *fileSource) GetAllFiles() ([]*desc.FileDescriptor, error) {
files := make([]*desc.FileDescriptor, len(fs.files))
i := 0
for _, fd := range fs.files {
files[i] = fd
i++
}
return files, nil
}
func (fs *fileSource) FindSymbol(fullyQualifiedName string) (desc.Descriptor, error) {
for _, fd := range fs.files {
if dsc := fd.FindSymbol(fullyQualifiedName); dsc != nil {
@ -200,7 +214,7 @@ func (fs *fileSource) AllExtensionsForType(typeName string) ([]*desc.FieldDescri
return fs.er.AllExtensionsForType(typeName), nil
}
// DescriptorSourceFromServer creates a DescriptorSource that uses the given GRPC reflection client
// DescriptorSourceFromServer creates a DescriptorSource that uses the given gRPC reflection client
// to interrogate a server for descriptor information. If the server does not support the reflection
// API then the various DescriptorSource methods will return ErrReflectionNotSupported
func DescriptorSourceFromServer(ctx context.Context, refClient *grpcreflect.Client) DescriptorSource {
@ -265,6 +279,75 @@ func ListServices(source DescriptorSource) ([]string, error) {
return svcs, nil
}
type sourceWithFiles interface {
GetAllFiles() ([]*desc.FileDescriptor, error)
}
var _ sourceWithFiles = (*fileSource)(nil)
// GetAllFiles uses the given descriptor source to return a list of file descriptors.
func GetAllFiles(source DescriptorSource) ([]*desc.FileDescriptor, error) {
var files []*desc.FileDescriptor
srcFiles, ok := source.(sourceWithFiles)
if ok {
var err error
files, err = srcFiles.GetAllFiles()
if err != nil {
return nil, err
}
} else {
// Source does not implement GetAllFiles method, so use ListServices
// and grab files from there.
allFiles := map[string]*desc.FileDescriptor{}
svcNames, err := source.ListServices()
if err != nil {
return nil, err
}
for _, name := range svcNames {
d, err := source.FindSymbol(name)
if err != nil {
return nil, err
}
addAllFilesToSet(d.GetFile(), allFiles)
}
files = make([]*desc.FileDescriptor, len(allFiles))
i := 0
for _, fd := range allFiles {
files[i] = fd
i++
}
}
sort.Sort(filesByName(files))
return files, nil
}
type filesByName []*desc.FileDescriptor
func (f filesByName) Len() int {
return len(f)
}
func (f filesByName) Less(i, j int) bool {
return f[i].GetName() < f[j].GetName()
}
func (f filesByName) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func addAllFilesToSet(fd *desc.FileDescriptor, all map[string]*desc.FileDescriptor) {
if _, ok := all[fd.GetName()]; ok {
// already added
return
}
all[fd.GetName()] = fd
for _, dep := range fd.GetDependencies() {
addAllFilesToSet(dep, all)
}
}
// ListMethods uses the given descriptor source to return a sorted list of method names
// for the specified fully-qualified service name.
func ListMethods(source DescriptorSource, serviceName string) ([]string, error) {
@ -319,27 +402,54 @@ type InvocationEventHandler interface {
}
// RequestMessageSupplier is a function that is called to retrieve request
// messages for a GRPC operation. The message contents must be valid JSON. If
// the supplier has no more messages, it should return nil, io.EOF.
// messages for a GRPC operation. This type is deprecated and will be removed in
// a future release.
//
// Deprecated: This is only used with the deprecated InvokeRpc. Instead, use
// RequestSupplier with InvokeRPC.
type RequestMessageSupplier func() ([]byte, error)
// InvokeRpc uses the given GRPC connection to invoke the given method. The given descriptor source
// InvokeRpc uses the given gRPC connection to invoke the given method. This function is deprecated
// and will be removed in a future release. It just delegates to the similarly named InvokeRPC
// method, whose signature is only slightly different.
//
// Deprecated: use InvokeRPC instead.
func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error {
return InvokeRPC(ctx, source, cc, methodName, headers, handler, func(m proto.Message) error {
// New function is almost identical, but the request supplier function works differently.
// So we adapt the logic here to maintain compatibility.
data, err := requestData()
if err != nil {
return err
}
return jsonpb.Unmarshal(bytes.NewReader(data), m)
})
}
// RequestSupplier is a function that is called to populate messages for a gRPC operation. The
// function should populate the given message or return a non-nil error. If the supplier has no
// more messages, it should return io.EOF. When it returns io.EOF, it should not in any way
// modify the given message argument.
type RequestSupplier func(proto.Message) error
// InvokeRPC uses the given gRPC channel to invoke the given method. The given descriptor source
// is used to determine the type of method and the type of request and response message. The given
// headers are sent as request metadata. Methods on the given event handler are called as the
// invocation proceeds.
//
// The given requestData function supplies the actual data to send. It should return io.EOF when
// there is no more request data. If it returns a nil error then the returned JSON message should
// not be blank. If the method being invoked is a unary or server-streaming RPC (e.g. exactly one
// request message) and there is no request data (e.g. the first invocation of the function returns
// io.EOF), then a blank request message is sent, as if the request data were an empty object: "{}".
// there is no more request data. If the method being invoked is a unary or server-streaming RPC
// (e.g. exactly one request message) and there is no request data (e.g. the first invocation of
// the function returns io.EOF), then an empty request message is sent.
//
// If the requestData function and the given event handler coordinate or share any state, they should
// be thread-safe. This is because the requestData function may be called from a different goroutine
// than the one invoking event callbacks. (This only happens for bi-directional streaming RPCs, where
// one goroutine sends request messages and another consumes the response messages).
func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error {
func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Channel, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestSupplier) error {
md := MetadataFromHeaders(headers)
@ -381,7 +491,7 @@ func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn
handler.OnSendHeaders(md)
ctx = metadata.NewOutgoingContext(ctx, md)
stub := grpcdynamic.NewStubWithMessageFactory(cc, msgFactory)
stub := grpcdynamic.NewStubWithMessageFactory(ch, msgFactory)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -397,21 +507,15 @@ func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn
}
func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
data, err := requestData()
err := requestData(req)
if err != nil && err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
return fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
}
}
if err != io.EOF {
// verify there is no second message, which is a usage error
_, err := requestData()
err := requestData(req)
if err == nil {
return fmt.Errorf("method %q is a unary RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())
} else if err != io.EOF {
@ -443,7 +547,7 @@ func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDesc
}
func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
// invoke the RPC!
str, err := stub.InvokeRpcClientStream(ctx, md)
@ -451,8 +555,7 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
// Upload each request message in the stream
var resp proto.Message
for err == nil {
var data []byte
data, err = requestData()
err = requestData(req)
if err == io.EOF {
resp, err = str.CloseAndReceive()
break
@ -460,12 +563,6 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
if err != nil {
return fmt.Errorf("error getting request data: %v", err)
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
return fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
}
}
err = str.SendMsg(req)
if err == io.EOF {
@ -500,21 +597,15 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
}
func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
data, err := requestData()
err := requestData(req)
if err != nil && err != io.EOF {
return fmt.Errorf("error getting request data: %v", err)
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
return fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
}
}
if err != io.EOF {
// verify there is no second message, which is a usage error
_, err := requestData()
err := requestData(req)
if err == nil {
return fmt.Errorf("method %q is a server-streaming RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())
} else if err != io.EOF {
@ -555,7 +646,7 @@ func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
}
func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestMessageSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message) error {
// invoke the RPC!
str, err := stub.InvokeRpcBidiStream(ctx, md)
@ -572,9 +663,8 @@ func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescr
// Concurrently upload each request message in the stream
var err error
var data []byte
for err == nil {
data, err = requestData()
err = requestData(req)
if err == io.EOF {
err = str.CloseSend()
@ -584,13 +674,6 @@ func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescr
err = fmt.Errorf("error getting request data: %v", err)
break
}
if len(data) != 0 {
err = jsonpb.UnmarshalString(string(data), req)
if err != nil {
err = fmt.Errorf("could not parse given request body as message of type %q: %v", md.GetInputType().GetFullyQualifiedName(), err)
break
}
}
err = str.SendMsg(req)
@ -840,7 +923,7 @@ func fullyConvertToDynamic(msgFact *dynamic.MessageFactory, msg proto.Message) (
return dm, nil
}
// ClientTransportCredentials builds transport credentials for a GRPC client using the
// ClientTransportCredentials builds transport credentials for a gRPC client using the
// given properties. If cacertFile is blank, only standard trusted certs are used to
// verify the server certs. If clientCertFile is blank, the client will not use a client
// certificate. If clientCertFile is not blank then clientKeyFile must not be blank.
@ -877,7 +960,7 @@ func ClientTransportCredentials(insecureSkipVerify bool, cacertFile, clientCertF
return credentials.NewTLS(&tlsConf), nil
}
// ServerTransportCredentials builds transport credentials for a GRPC server using the
// ServerTransportCredentials builds transport credentials for a gRPC server using the
// given properties. If cacertFile is blank, the server will not request client certs
// unless requireClientCerts is true. When requireClientCerts is false and cacertFile is
// not blank, the server will verify client certs when presented, but will not require