diff --git a/grpcurl.go b/grpcurl.go index 474ff7c..004dc1f 100644 --- a/grpcurl.go +++ b/grpcurl.go @@ -386,7 +386,7 @@ func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn defer cancel() if mtd.IsClientStreaming() && mtd.IsServerStreaming() { - return invokeBidi(ctx, cancel, stub, mtd, handler, requestData, req) + return invokeBidi(ctx, stub, mtd, handler, requestData, req) } else if mtd.IsClientStreaming() { return invokeClientStream(ctx, stub, mtd, handler, requestData, req) } else if mtd.IsServerStreaming() { @@ -554,13 +554,12 @@ func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.Met return nil } -func invokeBidi(ctx context.Context, cancel context.CancelFunc, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler, +func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler, requestData RequestMessageSupplier, req proto.Message) error { // invoke the RPC! str, err := stub.InvokeRpcBidiStream(ctx, md) - // mutex protects access to handler and sendErr since we'll have two goroutines sharing them var wg sync.WaitGroup var sendErr atomic.Value @@ -600,8 +599,6 @@ func invokeBidi(ctx context.Context, cancel context.CancelFunc, stub grpcdynamic if err != nil { sendErr.Store(err) - // signals error to other goroutine - cancel() } }() }