add bankdemo, from Gophercon 2018 demo

This commit is contained in:
Josh Humphries 2018-08-29 23:31:49 -06:00
parent 1e72fa2cd1
commit e1a9991958
11 changed files with 3844 additions and 0 deletions

View File

@ -0,0 +1,9 @@
# bankdemo
The `bankdemo` program is an example gRPC server that was used to demo `grpcurl` at Gophercon 2018.
It demonstrates interesting concepts for building a gRPC server, including chat functionality (that relies on full-duplex bidirectional streams). This code was written specifically to provide an interesting concrete demonstration and, as such, should not be considered in any way production-worthy.
The demo app tracks user accounts, transactions, and balances completely in memory. Every few seconds, as well as on graceful shutdown (like when the server receives a SIGTERM or SIGINT signal), this state is saved to a file named `accounts.json`, so that the data can be restored if the process restarts.
In addition to bank account data, the server also tracks "chat sessions", for demonstrating bidirectional streams in the form of an application where customers can chat with support agents.

View File

@ -0,0 +1,49 @@
package main
import (
"strings"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
func getCustomer(ctx context.Context) string {
// we'll just treat the "auth token" as if it is a
// customer ID, but reject tokens that begin with "agent"
// (those are auth tokens for support agents, not customers)
cust := getAuthCode(ctx)
if strings.HasPrefix(cust, "agent") {
return ""
}
return cust
}
func getAgent(ctx context.Context) string {
// we'll just treat the "auth token" as if it is an agent's
// user ID, but reject tokens that don't begin with "agent"
// (those are auth tokens for customers, not support agents)
agent := getAuthCode(ctx)
if !strings.HasPrefix(agent, "agent") {
return ""
}
return agent
}
func getAuthCode(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
vals := md.Get("authorization")
if len(vals) != 1 {
return ""
}
pieces := strings.SplitN(strings.ToLower(vals[0]), " ", 2)
if len(pieces) != 2 {
return ""
}
if pieces[0] != "token" {
return ""
}
return pieces[1]
}

View File

@ -0,0 +1,319 @@
package main
import (
"fmt"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// bankServer implements the Bank gRPC service.
type bankServer struct {
allAccounts *accounts
}
func (s *bankServer) OpenAccount(ctx context.Context, req *OpenAccountRequest) (*Account, error) {
cust := getCustomer(ctx)
if cust == "" {
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
switch req.Type {
case Account_CHECKING, Account_SAVING, Account_MONEY_MARKET:
if req.InitialDepositCents < 0 {
return nil, status.Errorf(codes.InvalidArgument, "initial deposit amount cannot be negative: %s", dollars(req.InitialDepositCents))
}
case Account_LINE_OF_CREDIT, Account_LOAN, Account_EQUITIES:
if req.InitialDepositCents != 0 {
return nil, status.Errorf(codes.InvalidArgument, "initial deposit amount must be zero for account type %v: %s", req.Type, dollars(req.InitialDepositCents))
}
default:
return nil, status.Errorf(codes.InvalidArgument, "invalid account type: %v", req.Type)
}
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
accountNums, ok := s.allAccounts.AccountNumbersByCustomer[cust]
if !ok {
// no accounts for this customer? it's a new customer
s.allAccounts.Customers = append(s.allAccounts.Customers, cust)
}
num := s.allAccounts.LastAccountNum + 1
s.allAccounts.LastAccountNum = num
s.allAccounts.AccountNumbers = append(s.allAccounts.AccountNumbers, num)
accountNums = append(accountNums, num)
s.allAccounts.AccountNumbersByCustomer[cust] = accountNums
var acct account
acct.AccountNumber = num
acct.BalanceCents = req.InitialDepositCents
acct.Transactions = append(acct.Transactions, &Transaction{
AccountNumber: num,
SeqNumber: 1,
Date: ptypes.TimestampNow(),
AmountCents: req.InitialDepositCents,
Desc: "initial deposit",
})
s.allAccounts.AccountsByNumber[num] = &acct
return &acct.Account, nil
}
func (s *bankServer) CloseAccount(ctx context.Context, req *CloseAccountRequest) (*empty.Empty, error) {
cust := getCustomer(ctx)
if cust == "" {
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
acctNums := s.allAccounts.AccountNumbersByCustomer[cust]
found := -1
for i, num := range acctNums {
if num == req.AccountNumber {
found = i
break
}
}
if found == -1 {
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", req.AccountNumber)
}
for i, num := range s.allAccounts.AccountNumbers {
if num == req.AccountNumber {
s.allAccounts.AccountNumbers = append(s.allAccounts.AccountNumbers[:i], s.allAccounts.AccountNumbers[i+1:]...)
break
}
}
acct := s.allAccounts.AccountsByNumber[req.AccountNumber]
if acct.BalanceCents != 0 {
return nil, status.Errorf(codes.FailedPrecondition, "account %d cannot be closed because it has a non-zero balance: %s", req.AccountNumber, dollars(acct.BalanceCents))
}
s.allAccounts.AccountNumbersByCustomer[cust] = append(acctNums[:found], acctNums[found+1:]...)
delete(s.allAccounts.AccountsByNumber, req.AccountNumber)
return &empty.Empty{}, nil
}
func (s *bankServer) GetAccounts(ctx context.Context, _ *empty.Empty) (*GetAccountsResponse, error) {
cust := getCustomer(ctx)
if cust == "" {
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
s.allAccounts.mu.RLock()
defer s.allAccounts.mu.RUnlock()
accountNums := s.allAccounts.AccountNumbersByCustomer[cust]
var accounts []*Account
for _, num := range accountNums {
accounts = append(accounts, &s.allAccounts.AccountsByNumber[num].Account)
}
return &GetAccountsResponse{Accounts: accounts}, nil
}
func (s *bankServer) GetTransactions(req *GetTransactionsRequest, stream Bank_GetTransactionsServer) error {
cust := getCustomer(stream.Context())
if cust == "" {
return status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
acct, err := func() (*account, error) {
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
acctNums := s.allAccounts.AccountNumbersByCustomer[cust]
for _, num := range acctNums {
if num == req.AccountNumber {
return s.allAccounts.AccountsByNumber[num], nil
}
}
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", req.AccountNumber)
}()
if err != nil {
return err
}
var start, end time.Time
if req.Start != nil {
start, err = ptypes.Timestamp(req.Start)
if err != nil {
return err
}
}
if req.End != nil {
end, err = ptypes.Timestamp(req.End)
if err != nil {
return err
}
} else {
end = time.Date(9999, 12, 31, 23, 59, 59, 999999999, time.Local)
}
acct.mu.RLock()
txns := acct.Transactions
acct.mu.RUnlock()
for _, txn := range txns {
t, err := ptypes.Timestamp(txn.Date)
if err != nil {
return err
}
if (t.After(start) || t.Equal(start)) &&
(t.Before(end) || t.Equal(end)) {
if err := stream.Send(txn); err != nil {
return err
}
}
}
return nil
}
func (s *bankServer) Deposit(ctx context.Context, req *DepositRequest) (*BalanceResponse, error) {
cust := getCustomer(ctx)
if cust == "" {
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
switch req.Source {
case DepositRequest_ACH, DepositRequest_CASH, DepositRequest_CHECK, DepositRequest_WIRE:
// ok
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown deposit source: %v", req.Source)
}
if req.AmountCents <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "deposit amount cannot be non-positive: %s", dollars(req.AmountCents))
}
desc := fmt.Sprintf("%v deposit", req.Source)
if req.Desc != "" {
desc = fmt.Sprintf("%s: %s", desc, req.Desc)
}
newBalance, err := s.newTransaction(cust, req.AccountNumber, req.AmountCents, desc)
if err != nil {
return nil, err
}
return &BalanceResponse{
AccountNumber: req.AccountNumber,
BalanceCents: newBalance,
}, nil
}
func (s *bankServer) getAccount(cust string, acctNumber uint64) (*account, error) {
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
acctNums := s.allAccounts.AccountNumbersByCustomer[cust]
for _, num := range acctNums {
if num == acctNumber {
return s.allAccounts.AccountsByNumber[num], nil
}
}
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", acctNumber)
}
func (s *bankServer) newTransaction(cust string, acctNumber uint64, amountCents int32, desc string) (int32, error) {
acct, err := s.getAccount(cust, acctNumber)
if err != nil {
return 0, err
}
return acct.newTransaction(amountCents, desc)
}
func (s *bankServer) Withdraw(ctx context.Context, req *WithdrawRequest) (*BalanceResponse, error) {
cust := getCustomer(ctx)
if cust == "" {
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
if req.AmountCents >= 0 {
return nil, status.Errorf(codes.InvalidArgument, "withdrawal amount cannot be non-negative: %s", dollars(req.AmountCents))
}
newBalance, err := s.newTransaction(cust, req.AccountNumber, req.AmountCents, req.Desc)
if err != nil {
return nil, err
}
return &BalanceResponse{
AccountNumber: req.AccountNumber,
BalanceCents: newBalance,
}, nil
}
func dollars(amountCents int32) string {
return fmt.Sprintf("$%02f", float64(amountCents)/100)
}
func (s *bankServer) Transfer(ctx context.Context, req *TransferRequest) (*TransferResponse, error) {
cust := getCustomer(ctx)
if cust == "" {
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
if req.AmountCents <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "transfer amount cannot be non-positive: %s", dollars(req.AmountCents))
}
var srcAcct *account
var srcDesc string
switch src := req.Source.(type) {
case *TransferRequest_ExternalSource:
srcDesc = fmt.Sprintf("ACH %09d:%06d", src.ExternalSource.AchRoutingNumber, src.ExternalSource.AchAccountNumber)
if src.ExternalSource.AchAccountNumber == 0 || src.ExternalSource.AchRoutingNumber == 0 {
return nil, status.Errorf(codes.InvalidArgument, "external source routing and account numbers cannot be zero: %s", srcDesc)
}
case *TransferRequest_SourceAccountNumber:
srcDesc = fmt.Sprintf("account %06d", src.SourceAccountNumber)
var err error
if srcAcct, err = s.getAccount(cust, src.SourceAccountNumber); err != nil {
return nil, err
}
}
var destAcct *account
var destDesc string
switch dest := req.Dest.(type) {
case *TransferRequest_ExternalDest:
destDesc = fmt.Sprintf("ACH %09d:%06d", dest.ExternalDest.AchRoutingNumber, dest.ExternalDest.AchAccountNumber)
if dest.ExternalDest.AchAccountNumber == 0 || dest.ExternalDest.AchRoutingNumber == 0 {
return nil, status.Errorf(codes.InvalidArgument, "external source routing and account numbers cannot be zero: %s", destDesc)
}
case *TransferRequest_DestAccountNumber:
destDesc = fmt.Sprintf("account %06d", dest.DestAccountNumber)
var err error
if destAcct, err = s.getAccount(cust, dest.DestAccountNumber); err != nil {
return nil, err
}
}
var srcBalance int32
if srcAcct != nil {
desc := fmt.Sprintf("transfer to %s", destDesc)
if req.Desc != "" {
desc = fmt.Sprintf("%s: %s", desc, req.Desc)
}
var err error
if srcBalance, err = srcAcct.newTransaction(-req.AmountCents, desc); err != nil {
return nil, err
}
}
var destBalance int32
if destAcct != nil {
desc := fmt.Sprintf("transfer from %s", srcDesc)
if req.Desc != "" {
desc = fmt.Sprintf("%s: %s", desc, req.Desc)
}
var err error
if destBalance, err = destAcct.newTransaction(req.AmountCents, desc); err != nil {
return nil, err
}
}
return &TransferResponse{
SrcAccountNumber: req.GetSourceAccountNumber(),
SrcBalanceCents: srcBalance,
DestAccountNumber: req.GetDestAccountNumber(),
DestBalanceCents: destBalance,
}, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,238 @@
syntax = "proto3";
option go_package = "main";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
// Bank provides operations for interacting with bank accounts. All
// operations operate for the authenticated user (identified via an
// "authorization" request header, where the type is "token" and the
// credential is the customer's ID).
service Bank {
// OpenAccount creates an account with the type and given initial deposit
// as its balance.
rpc OpenAccount(OpenAccountRequest) returns (Account);
// CloseAccount closes the indicated account. An account can only be
// closed if its balance is zero.
rpc CloseAccount(CloseAccountRequest) returns (google.protobuf.Empty);
// GetAccounts lists all accounts for the current customer.
rpc GetAccounts(google.protobuf.Empty) returns (GetAccountsResponse);
// GetTransactions streams all transactions that match the given criteria.
// If the given start date is not specified, transactions since beginning
// of time are included. Similarly, if the given end date is not specified,
// transactions all the way to the presnet are included.
rpc GetTransactions(GetTransactionsRequest) returns (stream Transaction);
// Deposit increases the balance of an account by depositing funds into it.
rpc Deposit(DepositRequest) returns (BalanceResponse);
// Withdraw decreases the balance of an account by withdrawing funds from it.
rpc Withdraw(WithdrawRequest) returns (BalanceResponse);
// Transfer moves money from one account to another. The source and destination
// accounts can be with this bank (e.g. "local" account numbers) or can be
// external accounts, identified by their ACH routing and account numbers.
rpc Transfer(TransferRequest) returns (TransferResponse);
}
// Support provides an interactive chat service, for customers to interact with
// the bank's support agents. A single stream, for either of the two methods, is
// a stateful connection to a single "chat session". Streams are initially disconnected
// (not part of any session). A stream must be disconnected from a session (via customer
// hang up or via agent leaving a session) before it can be connected to a new one.
service Support {
// ChatCustomer is used by a customer-facing app to send the customer's messages
// to a chat session. The customer is how initiates and terminates (via "hangup")
// a chat session. Only customers may invoke this method (e.g. requests must
// include customer auth credentials).
rpc ChatCustomer(stream ChatCustomerRequest) returns (stream ChatCustomerResponse);
// ChatAgent is used by an agent-facing app to allow an agent to reply to a
// customer's messages in a chat session. The agent may accept a chat session,
// which defaults to the session awaiting an agent for the longest period of time
// (FIFO queue).
rpc ChatAgent(stream ChatAgentRequest) returns (stream ChatAgentResponse);
}
message OpenAccountRequest {
int32 initial_deposit_cents = 1;
Account.Type type = 2;
}
message CloseAccountRequest {
uint64 account_number = 1;
}
message GetAccountsResponse {
repeated Account accounts = 1;
}
message Account {
uint64 account_number = 1;
enum Type {
UNKNOWN = 0;
CHECKING = 1;
SAVING = 2;
MONEY_MARKET = 3;
LINE_OF_CREDIT = 4;
LOAN = 5;
EQUITIES = 6;
}
Type type = 2;
int32 balance_cents = 3;
}
message GetTransactionsRequest {
uint64 account_number = 1;
google.protobuf.Timestamp start = 2;
google.protobuf.Timestamp end = 3;
}
message Transaction {
uint64 account_number = 1;
uint64 seq_number = 2;
google.protobuf.Timestamp date = 3;
int32 amount_cents = 4;
string desc = 5;
}
message DepositRequest {
uint64 account_number = 1;
int32 amount_cents = 2;
enum Source {
UNKNOWN = 0;
CASH = 1;
CHECK = 2;
ACH = 3;
WIRE = 4;
}
Source source = 3;
string desc = 4;
}
message BalanceResponse {
uint64 account_number = 1;
int32 balance_cents = 2;
}
message WithdrawRequest {
uint64 account_number = 1;
int32 amount_cents = 2;
string desc = 3;
}
message TransferRequest {
message ExternalAccount {
uint64 ach_routing_number = 1;
uint64 ach_account_number = 2;
}
oneof source {
uint64 source_account_number = 1;
ExternalAccount external_source = 2;
}
oneof dest {
uint64 dest_account_number = 3;
ExternalAccount external_dest = 4;
}
int32 amount_cents = 5;
string desc = 6;
}
message TransferResponse {
uint64 src_account_number = 1;
int32 src_balance_cents = 2;
uint64 dest_account_number = 3;
int32 dest_balance_cents = 4;
}
enum Void {
VOID = 0;
}
message ChatCustomerRequest {
oneof req {
// init is used when a chat stream is not part of a
// chat session. This is a stream's initial state, as well as
// the state after a "hang_up" request is sent. This creates
// a new state session or resumes an existing one.
InitiateChat init = 1;
// msg is used to send the customer's messages to support
// agents.
string msg = 2;
// hang_up is used to terminate a chat session. If a stream
// is broken, but the session was not terminated, the client
// may initiate a new stream and use init to resume that
// session. Sessions are not terminated unless done so
// explicitly via sending this kind of request on the stream.
Void hang_up = 3;
}
}
message InitiateChat {
string resume_session_id = 1;
}
message AgentMessage {
string agent_name = 1;
string msg = 2;
}
message ChatCustomerResponse {
oneof resp {
// session is sent from the server when the stream is connected
// to a chat session. This happens after an init request is sent
// and the stream is connected to either a new or resumed session.
Session session = 1;
// msg is sent from the server to convey agents' messages to the
// customer.
AgentMessage msg = 2;
}
}
message ChatAgentRequest {
oneof req {
// accept is used when an agent wants to join a customer chat
// session. It can be used to connect to a specific session (by
// ID), or to just accept the session for which the customer has
// been waiting the longest (e.g. poll a FIFO queue of sessions
// awaiting a support agent). It is possible for multiple agents
// to be connected to the same chat session.
AcceptChat accept = 1;
// msg is used to send a message to the customer. It will also be
// delivered to any other connected support agents.
string msg = 2;
// leave_session allows an agent to exit a chat session. They can
// always re-enter later by sending an accept message for that
// session ID.
Void leave_session = 3;
}
}
message AcceptChat {
string session_id = 1;
}
message ChatEntry {
google.protobuf.Timestamp date = 1;
oneof entry {
string customer_msg = 2;
AgentMessage agent_msg = 3;
}
}
message ChatAgentResponse {
oneof resp {
// accepted_session provides the detail of a chat session. The server
// sends this message after the agent has accepted a chat session.
Session accepted_session = 1;
// msg is sent by the server when the customer, or another support
// agent, sends a message in stream's current session.
ChatEntry msg = 2;
// session_ended notifies the support agent that their currently
// connected chat session has been terminated by the customer.
Void session_ended = 3;
}
}
message Session {
string session_id = 1;
string customer_name = 2;
repeated ChatEntry history = 3;
}

View File

@ -0,0 +1,465 @@
package main
import (
"fmt"
"io"
"sync"
"github.com/golang/protobuf/ptypes"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// chatServer implements the Support gRPC service, for providing
// a capability to connect customers and support agents in real-time
// chat.
type chatServer struct {
chatsBySession map[string]*session
chatsAwaitingAgent []string
lastSession int32
mu sync.RWMutex
}
type session struct {
Session
active bool
cust *listener
agents map[string]*listener
mu sync.RWMutex
}
type listener struct {
ch chan<- *ChatEntry
ctx context.Context
}
func (l *listener) send(e *ChatEntry) {
select {
case l.ch <- e:
case <-l.ctx.Done():
}
}
func (s *session) copySession() *Session {
s.mu.RLock()
defer s.mu.RUnlock()
return &Session{
SessionId: s.SessionId,
CustomerName: s.Session.CustomerName,
History: s.Session.History,
}
}
func (s *chatServer) ChatCustomer(stream Support_ChatCustomerServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
cust := getCustomer(ctx)
if cust == "" {
return status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
var sess *session
var ch chan *ChatEntry
var chCancel context.CancelFunc
cleanup := func() {
if sess != nil {
sess.mu.Lock()
sess.cust = nil
sess.mu.Unlock()
chCancel()
close(ch)
go func() {
// drain channel to prevent deadlock
for range ch {
}
}()
}
}
defer cleanup()
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
switch req := req.Req.(type) {
case *ChatCustomerRequest_Init:
if sess != nil {
return status.Errorf(codes.FailedPrecondition, "already called init, currently in chat session %q", sess.SessionId)
}
sessionID := req.Init.ResumeSessionId
if sessionID == "" {
sess, ch, chCancel = s.newSession(ctx, cust)
} else if sess, ch, chCancel = s.resumeSession(ctx, cust, sessionID); sess == nil {
return status.Errorf(codes.FailedPrecondition, "cannot resume session %q; it is not an open session", sessionID)
}
err := stream.Send(&ChatCustomerResponse{
Resp: &ChatCustomerResponse_Session{
Session: sess.copySession(),
},
})
if err != nil {
return err
}
// monitor the returned channel, sending incoming agent messages down the pipe
go func() {
for {
select {
case entry, ok := <-ch:
if !ok {
return
}
if e, ok := entry.Entry.(*ChatEntry_AgentMsg); ok {
stream.Send(&ChatCustomerResponse{
Resp: &ChatCustomerResponse_Msg{
Msg: e.AgentMsg,
},
})
}
case <-ctx.Done():
return
}
}
}()
case *ChatCustomerRequest_Msg:
if sess == nil {
return status.Errorf(codes.FailedPrecondition, "never called init, no chat session for message")
}
entry := &ChatEntry{
Date: ptypes.TimestampNow(),
Entry: &ChatEntry_CustomerMsg{
CustomerMsg: req.Msg,
},
}
func() {
sess.mu.Lock()
sess.Session.History = append(sess.Session.History, entry)
sess.mu.Unlock()
sess.mu.RLock()
defer sess.mu.RUnlock()
for _, l := range sess.agents {
l.send(entry)
}
}()
case *ChatCustomerRequest_HangUp:
if sess == nil {
return status.Errorf(codes.FailedPrecondition, "never called init, no chat session to hang up")
}
s.closeSession(sess)
cleanup()
sess = nil
default:
return status.Error(codes.InvalidArgument, "unknown request type")
}
}
}
func (s *chatServer) ChatAgent(stream Support_ChatAgentServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
agent := getAgent(ctx)
if agent == "" {
return status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
var sess *session
var ch chan *ChatEntry
var chCancel context.CancelFunc
cleanup := func() {
if sess != nil {
sess.mu.Lock()
delete(sess.agents, agent)
if len(sess.agents) == 0 {
s.mu.Lock()
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent, sess.SessionId)
s.mu.Unlock()
}
sess.mu.Unlock()
chCancel()
close(ch)
go func() {
// drain channel to prevent deadlock
for range ch {
}
}()
}
}
defer cleanup()
checkSession := func() {
// see if session was concurrently closed
if sess != nil {
sess.mu.RLock()
active := sess.active
sess.mu.RUnlock()
if !active {
cleanup()
sess = nil
}
}
}
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
checkSession()
switch req := req.Req.(type) {
case *ChatAgentRequest_Accept:
if sess != nil {
return status.Errorf(codes.FailedPrecondition, "already called accept, currently in chat session %q", sess.SessionId)
}
sess, ch, chCancel = s.acceptSession(ctx, agent, req.Accept.SessionId)
if sess == nil {
return status.Errorf(codes.FailedPrecondition, "no session to accept")
}
err := stream.Send(&ChatAgentResponse{
Resp: &ChatAgentResponse_AcceptedSession{
AcceptedSession: sess.copySession(),
},
})
if err != nil {
return err
}
// monitor the returned channel, sending incoming agent messages down the pipe
go func() {
for {
select {
case entry, ok := <-ch:
if !ok {
return
}
if entry == nil {
stream.Send(&ChatAgentResponse{
Resp: &ChatAgentResponse_SessionEnded{
SessionEnded: Void_VOID,
},
})
continue
}
if agentMsg, ok := entry.Entry.(*ChatEntry_AgentMsg); ok {
if agentMsg.AgentMsg.AgentName == agent {
continue
}
}
stream.Send(&ChatAgentResponse{
Resp: &ChatAgentResponse_Msg{
Msg: entry,
},
})
case <-ctx.Done():
return
}
}
}()
case *ChatAgentRequest_Msg:
if sess == nil {
return status.Errorf(codes.FailedPrecondition, "never called accept, no chat session for message")
}
entry := &ChatEntry{
Date: ptypes.TimestampNow(),
Entry: &ChatEntry_AgentMsg{
AgentMsg: &AgentMessage{
AgentName: agent,
Msg: req.Msg,
},
},
}
active := true
func() {
sess.mu.Lock()
active = sess.active
if active {
sess.Session.History = append(sess.Session.History, entry)
}
sess.mu.Unlock()
if !active {
return
}
sess.mu.RLock()
defer sess.mu.RUnlock()
if sess.cust != nil {
sess.cust.send(entry)
}
for otherAgent, l := range sess.agents {
if otherAgent == agent {
continue
}
l.send(entry)
}
}()
if !active {
return status.Errorf(codes.FailedPrecondition, "customer hung up on chat session %s", sess.SessionId)
}
case *ChatAgentRequest_LeaveSession:
if sess == nil {
return status.Errorf(codes.FailedPrecondition, "never called init, no chat session to hang up")
}
s.closeSession(sess)
cleanup()
sess = nil
default:
return status.Error(codes.InvalidArgument, "unknown request type")
}
}
}
func (s *chatServer) newSession(ctx context.Context, cust string) (*session, chan *ChatEntry, context.CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastSession++
id := fmt.Sprintf("%06d", s.lastSession)
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent, id)
ch := make(chan *ChatEntry, 1)
ctx, cancel := context.WithCancel(ctx)
l := &listener{
ch: ch,
ctx: ctx,
}
sess := session{
active: true,
Session: Session{
SessionId: id,
CustomerName: cust,
},
cust: l,
}
s.chatsBySession[id] = &sess
return &sess, ch, cancel
}
func (s *chatServer) resumeSession(ctx context.Context, cust, sessionID string) (*session, chan *ChatEntry, context.CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
sess := s.chatsBySession[sessionID]
if sess.CustomerName != cust {
// customer cannot join chat that they did not start
return nil, nil, nil
}
if !sess.active {
// chat has been closed
return nil, nil, nil
}
if sess.cust != nil {
// customer is active in the chat in another stream!
return nil, nil, nil
}
ch := make(chan *ChatEntry, 1)
ctx, cancel := context.WithCancel(ctx)
l := &listener{
ch: ch,
ctx: ctx,
}
sess.cust = l
return sess, ch, cancel
}
func (s *chatServer) closeSession(sess *session) {
active := true
func() {
sess.mu.Lock()
active = sess.active
sess.active = false
sess.mu.Unlock()
if !active {
// already closed
return
}
sess.mu.RLock()
defer sess.mu.RUnlock()
for _, l := range sess.agents {
l.send(nil)
}
}()
if !active {
// already closed
return
}
s.mu.Lock()
defer s.mu.Unlock()
delete(s.chatsBySession, sess.SessionId)
for i, id := range s.chatsAwaitingAgent {
if id == sess.SessionId {
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent[:i], s.chatsAwaitingAgent[i+1:]...)
break
}
}
}
func (s *chatServer) acceptSession(ctx context.Context, agent, sessionID string) (*session, chan *ChatEntry, context.CancelFunc) {
var sess *session
func() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.chatsAwaitingAgent) == 0 {
return
}
if sessionID == "" {
sessionID = s.chatsAwaitingAgent[0]
s.chatsAwaitingAgent = s.chatsAwaitingAgent[1:]
} else {
found := false
for i, id := range s.chatsAwaitingAgent {
if id == sessionID {
found = true
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent[:i], s.chatsAwaitingAgent[i+1:]...)
break
}
}
if !found {
return
}
}
sess = s.chatsBySession[sessionID]
}()
if sess == nil {
return nil, nil, nil
}
ch := make(chan *ChatEntry, 1)
ctx, cancel := context.WithCancel(ctx)
l := &listener{
ch: ch,
ctx: ctx,
}
sess.mu.Lock()
if sess.agents == nil {
sess.agents = map[string]*listener{}
}
sess.agents[agent] = l
sess.mu.Unlock()
return sess, ch, cancel
}

View File

@ -0,0 +1,93 @@
package main
import (
"bytes"
"sync"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// In-memory database that is periodically saved to a JSON file.
type accounts struct {
AccountNumbersByCustomer map[string][]uint64
AccountsByNumber map[uint64]*account
AccountNumbers []uint64
Customers []string
LastAccountNum uint64
mu sync.RWMutex
}
type account struct {
Account
Transactions []*Transaction
mu sync.RWMutex
}
func (a *account) newTransaction(amountCents int32, desc string) (int32, error) {
a.mu.Lock()
defer a.mu.Unlock()
newBalance := a.BalanceCents + amountCents
if newBalance < 0 {
return 0, status.Errorf(codes.FailedPrecondition, "insufficient funds: cannot withdraw %s when balance is %s", dollars(amountCents), dollars(a.BalanceCents))
}
a.BalanceCents += amountCents
a.Transactions = append(a.Transactions, &Transaction{
AccountNumber: a.AccountNumber,
Date: ptypes.TimestampNow(),
AmountCents: amountCents,
SeqNumber: uint64(len(a.Transactions) + 1),
Desc: desc,
})
return a.BalanceCents, nil
}
func (t *Transaction) MarshalJSON() ([]byte, error) {
var jsm jsonpb.Marshaler
var buf bytes.Buffer
if err := jsm.Marshal(&buf, t); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (t *Transaction) UnmarshalJSON(b []byte) error {
return jsonpb.Unmarshal(bytes.NewReader(b), t)
}
func (a *accounts) Clone() *accounts {
var clone accounts
clone.AccountNumbersByCustomer = map[string][]uint64{}
clone.AccountsByNumber = map[uint64]*account{}
a.mu.RLock()
clone.Customers = a.Customers
a.mu.RUnlock()
for _, cust := range clone.Customers {
var acctNums []uint64
a.mu.RLock()
acctNums = a.AccountNumbersByCustomer[cust]
a.mu.RUnlock()
clone.AccountNumbersByCustomer[cust] = acctNums
clone.AccountNumbers = append(clone.AccountNumbers, acctNums...)
for _, acctNum := range acctNums {
a.mu.RLock()
acct := a.AccountsByNumber[acctNum]
a.mu.RUnlock()
acct.mu.RLock()
txns := acct.Transactions
acct.mu.RUnlock()
clone.AccountsByNumber[acctNum] = &account{Transactions: txns}
}
}
return &clone
}

View File

@ -0,0 +1,160 @@
package main
//go:generate protoc --go_out=plugins=grpc:./ bank.proto
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stderr))
port := flag.Int("port", 12345, "The port on which bankdemo gRPC server will listen.")
datafile := flag.String("datafile", "accounts.json", "The path and filename to which bank account data is saved and from which data will be loaded.")
flag.Parse()
// create the server and load initial dataset
ctx, cancel := context.WithCancel(context.Background())
s := &svr{
datafile: *datafile,
ctx: ctx,
cancel: cancel,
}
if err := s.load(); err != nil {
panic(err)
}
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *port))
if err != nil {
panic(err)
}
grpcSvr := gRPCServer()
// Register gRPC service implementations
bankSvc := bankServer{
allAccounts: &s.allAccounts,
}
RegisterBankServer(grpcSvr, &bankSvc)
chatSvc := chatServer{
chatsBySession: map[string]*session{},
}
RegisterSupportServer(grpcSvr, &chatSvc)
go s.bgSaver()
// don't forget to include server reflection support!
reflection.Register(grpcSvr)
defer func() {
cancel()
s.flush()
}()
grpclog.Infof("server starting, listening on %v", l.Addr())
if err := grpcSvr.Serve(l); err != nil {
panic(err)
}
}
func gRPCServer() *grpc.Server {
var reqCounter uint64
return grpc.NewServer(
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
reqID := atomic.AddUint64(&reqCounter, 1)
var client string
if p, ok := peer.FromContext(ctx); ok {
client = p.Addr.String()
} else {
client = "?"
}
grpclog.Infof("request %d started for %s from %s", reqID, info.FullMethod, client)
rsp, err := handler(ctx, req)
stat, _ := status.FromError(err)
grpclog.Infof("request %d completed for %s from %s: %v %s", reqID, info.FullMethod, client, stat.Code(), stat.Message())
return rsp, err
}),
grpc.StreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
reqID := atomic.AddUint64(&reqCounter, 1)
var client string
if p, ok := peer.FromContext(ss.Context()); ok {
client = p.Addr.String()
} else {
client = "?"
}
grpclog.Infof("request %d started for %s from %s", reqID, info.FullMethod, client)
err := handler(srv, ss)
stat, _ := status.FromError(err)
grpclog.Infof("request %d completed for %s from %s: %v %s", reqID, info.FullMethod, client, stat.Code(), stat.Message())
return err
}))
}
type svr struct {
datafile string
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
allAccounts accounts
}
func (s *svr) load() error {
accts, err := ioutil.ReadFile(s.datafile)
if err != nil && !os.IsNotExist(err) {
return err
}
if len(accts) == 0 {
s.allAccounts.AccountNumbersByCustomer = map[string][]uint64{}
s.allAccounts.AccountsByNumber = map[uint64]*account{}
} else if err := json.Unmarshal(accts, &s.allAccounts); err != nil {
return err
}
return nil
}
func (s *svr) bgSaver() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
s.flush()
case <-s.ctx.Done():
ticker.Stop()
return
}
}
}
func (s *svr) flush() {
s.mu.Lock()
defer s.mu.Unlock()
if b, err := json.Marshal(&s.allAccounts); err != nil {
grpclog.Errorf("failed to save data to %q", s.datafile)
} else if err := ioutil.WriteFile(s.datafile, b, 0666); err != nil {
grpclog.Errorf("failed to save data to %q", s.datafile)
}
}

View File

@ -0,0 +1,6 @@
# testserver
The `testserver` program is a simple server that can be used for testing RPC clients such
as `grpcurl`. It implements an RPC interface that is defined in `grpcurl`'s [testing package](https://github.com/fullstorydev/grpcurl/blob/master/testing/example.proto) and also exposes [the implementation](https://godoc.org/github.com/fullstorydev/grpcurl/testing#TestServer) that is defined in that same package. This is the same test interface and implementation that is used in unit tests for `grpcurl`.
For a possibly more interesting test server, take a look at `bankdemo`, which is a demo gRPC app that provides a more concrete RPC interface, including full-duplex bidirectional streaming methods, plus an example implementation.