增加callOptions

This commit is contained in:
woniu317 2021-12-21 09:48:50 +08:00
parent 8ee6c9423b
commit 4f02cf92a9
1 changed files with 15 additions and 15 deletions

View File

@ -51,7 +51,7 @@ type RequestMessageSupplier func() ([]byte, error)
//
// Deprecated: use InvokeRPC instead.
func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error {
headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier, opts ...grpc.CallOption) error {
return InvokeRPC(ctx, source, cc, methodName, headers, handler, func(m proto.Message) error {
// New function is almost identical, but the request supplier function works differently.
@ -61,7 +61,7 @@ func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn
return err
}
return jsonpb.Unmarshal(bytes.NewReader(data), m)
})
}, opts...)
}
// RequestSupplier is a function that is called to populate messages for a gRPC operation. The
@ -85,7 +85,7 @@ type RequestSupplier func(proto.Message) error
// than the one invoking event callbacks. (This only happens for bi-directional streaming RPCs, where
// one goroutine sends request messages and another consumes the response messages).
func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Channel, methodName string,
headers []string, handler InvocationEventHandler, requestData RequestSupplier) error {
headers []string, handler InvocationEventHandler, requestData RequestSupplier, opts ...grpc.CallOption) error {
md := MetadataFromHeaders(headers)
@ -140,18 +140,18 @@ func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Chan
defer cancel()
if mtd.IsClientStreaming() && mtd.IsServerStreaming() {
return invokeBidi(ctx, stub, mtd, handler, requestData, req)
return invokeBidi(ctx, stub, mtd, handler, requestData, req, opts...)
} else if mtd.IsClientStreaming() {
return invokeClientStream(ctx, stub, mtd, handler, requestData, req)
return invokeClientStream(ctx, stub, mtd, handler, requestData, req, opts...)
} else if mtd.IsServerStreaming() {
return invokeServerStream(ctx, stub, mtd, handler, requestData, req)
return invokeServerStream(ctx, stub, mtd, handler, requestData, req, opts...)
} else {
return invokeUnary(ctx, stub, mtd, handler, requestData, req)
return invokeUnary(ctx, stub, mtd, handler, requestData, req, opts...)
}
}
func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message, opts ...grpc.CallOption) error {
err := requestData(req)
if err != nil && err != io.EOF {
@ -170,7 +170,7 @@ func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDesc
// Now we can actually invoke the RPC!
var respHeaders metadata.MD
var respTrailers metadata.MD
resp, err := stub.InvokeRpc(ctx, md, req, grpc.Trailer(&respTrailers), grpc.Header(&respHeaders))
resp, err := stub.InvokeRpc(ctx, md, req, append([]grpc.CallOption{grpc.Trailer(&respTrailers), grpc.Header(&respHeaders)}, opts...)...)
stat, ok := status.FromError(err)
if !ok {
@ -191,10 +191,10 @@ func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDesc
}
func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message, opts ...grpc.CallOption) error {
// invoke the RPC!
str, err := stub.InvokeRpcClientStream(ctx, md)
str, err := stub.InvokeRpcClientStream(ctx, md, opts...)
// Upload each request message in the stream
var resp proto.Message
@ -245,7 +245,7 @@ func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
}
func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message, opts ...grpc.CallOption) error {
err := requestData(req)
if err != nil && err != io.EOF {
@ -262,7 +262,7 @@ func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
}
// Now we can actually invoke the RPC!
str, err := stub.InvokeRpcServerStream(ctx, md, req)
str, err := stub.InvokeRpcServerStream(ctx, md, req, opts...)
if respHeaders, err := str.Header(); err == nil {
handler.OnReceiveHeaders(respHeaders)
@ -294,13 +294,13 @@ func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met
}
func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,
requestData RequestSupplier, req proto.Message) error {
requestData RequestSupplier, req proto.Message, callOptions ...grpc.CallOption) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// invoke the RPC!
str, err := stub.InvokeRpcBidiStream(ctx, md)
str, err := stub.InvokeRpcBidiStream(ctx, md, callOptions...)
var wg sync.WaitGroup
var sendErr atomic.Value