diff --git a/invoke.go b/invoke.go index b5bae4b..1a3ad5f 100644 --- a/invoke.go +++ b/invoke.go @@ -51,7 +51,7 @@ type RequestMessageSupplier func() ([]byte, error) // // Deprecated: use InvokeRPC instead. func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string, - headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error { + headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier, opts ...grpc.CallOption) error { return InvokeRPC(ctx, source, cc, methodName, headers, handler, func(m proto.Message) error { // New function is almost identical, but the request supplier function works differently. @@ -61,7 +61,7 @@ func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn return err } return jsonpb.Unmarshal(bytes.NewReader(data), m) - }) + }, opts...) } // RequestSupplier is a function that is called to populate messages for a gRPC operation. The @@ -85,7 +85,7 @@ type RequestSupplier func(proto.Message) error // than the one invoking event callbacks. (This only happens for bi-directional streaming RPCs, where // one goroutine sends request messages and another consumes the response messages). func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Channel, methodName string, - headers []string, handler InvocationEventHandler, requestData RequestSupplier) error { + headers []string, handler InvocationEventHandler, requestData RequestSupplier, opts ...grpc.CallOption) error { md := MetadataFromHeaders(headers) @@ -140,18 +140,18 @@ func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Chan defer cancel() if mtd.IsClientStreaming() && mtd.IsServerStreaming() { - return invokeBidi(ctx, stub, mtd, handler, requestData, req) + return invokeBidi(ctx, stub, mtd, handler, requestData, req, opts...) } else if mtd.IsClientStreaming() { - return invokeClientStream(ctx, stub, mtd, handler, requestData, req) + return invokeClientStream(ctx, stub, mtd, handler, requestData, req, opts...) } else if mtd.IsServerStreaming() { - return invokeServerStream(ctx, stub, mtd, handler, requestData, req) + return invokeServerStream(ctx, stub, mtd, handler, requestData, req, opts...) } else { - return invokeUnary(ctx, stub, mtd, handler, requestData, req) + return invokeUnary(ctx, stub, mtd, handler, requestData, req, opts...) } } func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler, - requestData RequestSupplier, req proto.Message) error { + requestData RequestSupplier, req proto.Message, opts ...grpc.CallOption) error { err := requestData(req) if err != nil && err != io.EOF { @@ -170,7 +170,7 @@ func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDesc // Now we can actually invoke the RPC! var respHeaders metadata.MD var respTrailers metadata.MD - resp, err := stub.InvokeRpc(ctx, md, req, grpc.Trailer(&respTrailers), grpc.Header(&respHeaders)) + resp, err := stub.InvokeRpc(ctx, md, req, append([]grpc.CallOption{grpc.Trailer(&respTrailers), grpc.Header(&respHeaders)}, opts...)...) stat, ok := status.FromError(err) if !ok { @@ -191,10 +191,10 @@ func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDesc } func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler, - requestData RequestSupplier, req proto.Message) error { + requestData RequestSupplier, req proto.Message, opts ...grpc.CallOption) error { // invoke the RPC! - str, err := stub.InvokeRpcClientStream(ctx, md) + str, err := stub.InvokeRpcClientStream(ctx, md, opts...) // Upload each request message in the stream var resp proto.Message @@ -245,7 +245,7 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met } func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler, - requestData RequestSupplier, req proto.Message) error { + requestData RequestSupplier, req proto.Message, opts ...grpc.CallOption) error { err := requestData(req) if err != nil && err != io.EOF { @@ -262,7 +262,7 @@ func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met } // Now we can actually invoke the RPC! - str, err := stub.InvokeRpcServerStream(ctx, md, req) + str, err := stub.InvokeRpcServerStream(ctx, md, req, opts...) if respHeaders, err := str.Header(); err == nil { handler.OnReceiveHeaders(respHeaders) @@ -294,13 +294,13 @@ func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met } func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler, - requestData RequestSupplier, req proto.Message) error { + requestData RequestSupplier, req proto.Message, callOptions ...grpc.CallOption) error { ctx, cancel := context.WithCancel(ctx) defer cancel() // invoke the RPC! - str, err := stub.InvokeRpcBidiStream(ctx, md) + str, err := stub.InvokeRpcBidiStream(ctx, md, callOptions...) var wg sync.WaitGroup var sendErr atomic.Value