add bankdemo, from Gophercon 2018 demo (#45)
This commit is contained in:
parent
1e72fa2cd1
commit
6198551381
|
|
@ -0,0 +1,9 @@
|
||||||
|
# bankdemo
|
||||||
|
|
||||||
|
The `bankdemo` program is an example gRPC server that was used to demo `grpcurl` at Gophercon 2018.
|
||||||
|
|
||||||
|
It demonstrates interesting concepts for building a gRPC server, including chat functionality (that relies on full-duplex bidirectional streams). This code was written specifically to provide an interesting concrete demonstration and, as such, should not be considered in any way production-worthy.
|
||||||
|
|
||||||
|
The demo app tracks user accounts, transactions, and balances completely in memory. Every few seconds, as well as on graceful shutdown (like when the server receives a SIGTERM or SIGINT signal), this state is saved to a file named `accounts.json`, so that the data can be restored if the process restarts.
|
||||||
|
|
||||||
|
In addition to bank account data, the server also tracks "chat sessions", for demonstrating bidirectional streams in the form of an application where customers can chat with support agents.
|
||||||
|
|
@ -0,0 +1,49 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getCustomer(ctx context.Context) string {
|
||||||
|
// we'll just treat the "auth token" as if it is a
|
||||||
|
// customer ID, but reject tokens that begin with "agent"
|
||||||
|
// (those are auth tokens for support agents, not customers)
|
||||||
|
cust := getAuthCode(ctx)
|
||||||
|
if strings.HasPrefix(cust, "agent") {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return cust
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAgent(ctx context.Context) string {
|
||||||
|
// we'll just treat the "auth token" as if it is an agent's
|
||||||
|
// user ID, but reject tokens that don't begin with "agent"
|
||||||
|
// (those are auth tokens for customers, not support agents)
|
||||||
|
agent := getAuthCode(ctx)
|
||||||
|
if !strings.HasPrefix(agent, "agent") {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return agent
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAuthCode(ctx context.Context) string {
|
||||||
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
vals := md.Get("authorization")
|
||||||
|
if len(vals) != 1 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
pieces := strings.SplitN(strings.ToLower(vals[0]), " ", 2)
|
||||||
|
if len(pieces) != 2 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if pieces[0] != "token" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return pieces[1]
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,237 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/ptypes"
|
||||||
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// bankServer implements the Bank gRPC service.
|
||||||
|
type bankServer struct {
|
||||||
|
allAccounts *accounts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *bankServer) OpenAccount(ctx context.Context, req *OpenAccountRequest) (*Account, error) {
|
||||||
|
cust := getCustomer(ctx)
|
||||||
|
if cust == "" {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
switch req.Type {
|
||||||
|
case Account_CHECKING, Account_SAVING, Account_MONEY_MARKET:
|
||||||
|
if req.InitialDepositCents < 0 {
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "initial deposit amount cannot be negative: %s", dollars(req.InitialDepositCents))
|
||||||
|
}
|
||||||
|
case Account_LINE_OF_CREDIT, Account_LOAN, Account_EQUITIES:
|
||||||
|
if req.InitialDepositCents != 0 {
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "initial deposit amount must be zero for account type %v: %s", req.Type, dollars(req.InitialDepositCents))
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "invalid account type: %v", req.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.allAccounts.openAccount(cust, req.Type, req.InitialDepositCents), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *bankServer) CloseAccount(ctx context.Context, req *CloseAccountRequest) (*empty.Empty, error) {
|
||||||
|
cust := getCustomer(ctx)
|
||||||
|
if cust == "" {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.allAccounts.closeAccount(cust, req.AccountNumber); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &empty.Empty{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *bankServer) GetAccounts(ctx context.Context, _ *empty.Empty) (*GetAccountsResponse, error) {
|
||||||
|
cust := getCustomer(ctx)
|
||||||
|
if cust == "" {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
accounts := s.allAccounts.getAllAccounts(cust)
|
||||||
|
return &GetAccountsResponse{Accounts: accounts}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *bankServer) GetTransactions(req *GetTransactionsRequest, stream Bank_GetTransactionsServer) error {
|
||||||
|
cust := getCustomer(stream.Context())
|
||||||
|
if cust == "" {
|
||||||
|
return status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
acct, err := s.allAccounts.getAccount(cust, req.AccountNumber)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var start, end time.Time
|
||||||
|
if req.Start != nil {
|
||||||
|
start, err = ptypes.Timestamp(req.Start)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if req.End != nil {
|
||||||
|
end, err = ptypes.Timestamp(req.End)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
end = time.Date(9999, 12, 31, 23, 59, 59, 999999999, time.Local)
|
||||||
|
}
|
||||||
|
|
||||||
|
txns := acct.getTransactions()
|
||||||
|
for _, txn := range txns {
|
||||||
|
t, err := ptypes.Timestamp(txn.Date)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if (t.After(start) || t.Equal(start)) &&
|
||||||
|
(t.Before(end) || t.Equal(end)) {
|
||||||
|
|
||||||
|
if err := stream.Send(txn); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *bankServer) Deposit(ctx context.Context, req *DepositRequest) (*BalanceResponse, error) {
|
||||||
|
cust := getCustomer(ctx)
|
||||||
|
if cust == "" {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
switch req.Source {
|
||||||
|
case DepositRequest_ACH, DepositRequest_CASH, DepositRequest_CHECK, DepositRequest_WIRE:
|
||||||
|
// ok
|
||||||
|
default:
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "unknown deposit source: %v", req.Source)
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.AmountCents <= 0 {
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "deposit amount cannot be non-positive: %s", dollars(req.AmountCents))
|
||||||
|
}
|
||||||
|
|
||||||
|
desc := fmt.Sprintf("%v deposit", req.Source)
|
||||||
|
if req.Desc != "" {
|
||||||
|
desc = fmt.Sprintf("%s: %s", desc, req.Desc)
|
||||||
|
}
|
||||||
|
acct, err := s.allAccounts.getAccount(cust, req.AccountNumber)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newBalance, err := acct.newTransaction(req.AmountCents, desc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &BalanceResponse{
|
||||||
|
AccountNumber: req.AccountNumber,
|
||||||
|
BalanceCents: newBalance,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *bankServer) Withdraw(ctx context.Context, req *WithdrawRequest) (*BalanceResponse, error) {
|
||||||
|
cust := getCustomer(ctx)
|
||||||
|
if cust == "" {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.AmountCents >= 0 {
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "withdrawal amount cannot be non-negative: %s", dollars(req.AmountCents))
|
||||||
|
}
|
||||||
|
|
||||||
|
acct, err := s.allAccounts.getAccount(cust, req.AccountNumber)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newBalance, err := acct.newTransaction(req.AmountCents, req.Desc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &BalanceResponse{
|
||||||
|
AccountNumber: req.AccountNumber,
|
||||||
|
BalanceCents: newBalance,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *bankServer) Transfer(ctx context.Context, req *TransferRequest) (*TransferResponse, error) {
|
||||||
|
cust := getCustomer(ctx)
|
||||||
|
if cust == "" {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.AmountCents <= 0 {
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "transfer amount cannot be non-positive: %s", dollars(req.AmountCents))
|
||||||
|
}
|
||||||
|
|
||||||
|
var srcAcct *account
|
||||||
|
var srcDesc string
|
||||||
|
switch src := req.Source.(type) {
|
||||||
|
case *TransferRequest_ExternalSource:
|
||||||
|
srcDesc = fmt.Sprintf("ACH %09d:%06d", src.ExternalSource.AchRoutingNumber, src.ExternalSource.AchAccountNumber)
|
||||||
|
if src.ExternalSource.AchAccountNumber == 0 || src.ExternalSource.AchRoutingNumber == 0 {
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "external source routing and account numbers cannot be zero: %s", srcDesc)
|
||||||
|
}
|
||||||
|
case *TransferRequest_SourceAccountNumber:
|
||||||
|
srcDesc = fmt.Sprintf("account %06d", src.SourceAccountNumber)
|
||||||
|
var err error
|
||||||
|
if srcAcct, err = s.allAccounts.getAccount(cust, src.SourceAccountNumber); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var destAcct *account
|
||||||
|
var destDesc string
|
||||||
|
switch dest := req.Dest.(type) {
|
||||||
|
case *TransferRequest_ExternalDest:
|
||||||
|
destDesc = fmt.Sprintf("ACH %09d:%06d", dest.ExternalDest.AchRoutingNumber, dest.ExternalDest.AchAccountNumber)
|
||||||
|
if dest.ExternalDest.AchAccountNumber == 0 || dest.ExternalDest.AchRoutingNumber == 0 {
|
||||||
|
return nil, status.Errorf(codes.InvalidArgument, "external source routing and account numbers cannot be zero: %s", destDesc)
|
||||||
|
}
|
||||||
|
case *TransferRequest_DestAccountNumber:
|
||||||
|
destDesc = fmt.Sprintf("account %06d", dest.DestAccountNumber)
|
||||||
|
var err error
|
||||||
|
if destAcct, err = s.allAccounts.getAccount(cust, dest.DestAccountNumber); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var srcBalance int32
|
||||||
|
if srcAcct != nil {
|
||||||
|
desc := fmt.Sprintf("transfer to %s", destDesc)
|
||||||
|
if req.Desc != "" {
|
||||||
|
desc = fmt.Sprintf("%s: %s", desc, req.Desc)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
if srcBalance, err = srcAcct.newTransaction(-req.AmountCents, desc); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var destBalance int32
|
||||||
|
if destAcct != nil {
|
||||||
|
desc := fmt.Sprintf("transfer from %s", srcDesc)
|
||||||
|
if req.Desc != "" {
|
||||||
|
desc = fmt.Sprintf("%s: %s", desc, req.Desc)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
if destBalance, err = destAcct.newTransaction(req.AmountCents, desc); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TransferResponse{
|
||||||
|
SrcAccountNumber: req.GetSourceAccountNumber(),
|
||||||
|
SrcBalanceCents: srcBalance,
|
||||||
|
DestAccountNumber: req.GetDestAccountNumber(),
|
||||||
|
DestBalanceCents: destBalance,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,125 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option go_package = "main";
|
||||||
|
|
||||||
|
import "google/protobuf/empty.proto";
|
||||||
|
import "google/protobuf/timestamp.proto";
|
||||||
|
|
||||||
|
// Bank provides operations for interacting with bank accounts. All
|
||||||
|
// operations operate for the authenticated user (identified via an
|
||||||
|
// "authorization" request header, where the type is "token" and the
|
||||||
|
// credential is the customer's ID).
|
||||||
|
service Bank {
|
||||||
|
// OpenAccount creates an account with the type and given initial deposit
|
||||||
|
// as its balance.
|
||||||
|
rpc OpenAccount(OpenAccountRequest) returns (Account);
|
||||||
|
// CloseAccount closes the indicated account. An account can only be
|
||||||
|
// closed if its balance is zero.
|
||||||
|
rpc CloseAccount(CloseAccountRequest) returns (google.protobuf.Empty);
|
||||||
|
// GetAccounts lists all accounts for the current customer.
|
||||||
|
rpc GetAccounts(google.protobuf.Empty) returns (GetAccountsResponse);
|
||||||
|
// GetTransactions streams all transactions that match the given criteria.
|
||||||
|
// If the given start date is not specified, transactions since beginning
|
||||||
|
// of time are included. Similarly, if the given end date is not specified,
|
||||||
|
// transactions all the way to the presnet are included.
|
||||||
|
rpc GetTransactions(GetTransactionsRequest) returns (stream Transaction);
|
||||||
|
// Deposit increases the balance of an account by depositing funds into it.
|
||||||
|
rpc Deposit(DepositRequest) returns (BalanceResponse);
|
||||||
|
// Withdraw decreases the balance of an account by withdrawing funds from it.
|
||||||
|
rpc Withdraw(WithdrawRequest) returns (BalanceResponse);
|
||||||
|
// Transfer moves money from one account to another. The source and destination
|
||||||
|
// accounts can be with this bank (e.g. "local" account numbers) or can be
|
||||||
|
// external accounts, identified by their ACH routing and account numbers.
|
||||||
|
rpc Transfer(TransferRequest) returns (TransferResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message OpenAccountRequest {
|
||||||
|
int32 initial_deposit_cents = 1;
|
||||||
|
Account.Type type = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CloseAccountRequest {
|
||||||
|
uint64 account_number = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetAccountsResponse {
|
||||||
|
repeated Account accounts = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Account {
|
||||||
|
uint64 account_number = 1;
|
||||||
|
enum Type {
|
||||||
|
UNKNOWN = 0;
|
||||||
|
CHECKING = 1;
|
||||||
|
SAVING = 2;
|
||||||
|
MONEY_MARKET = 3;
|
||||||
|
LINE_OF_CREDIT = 4;
|
||||||
|
LOAN = 5;
|
||||||
|
EQUITIES = 6;
|
||||||
|
}
|
||||||
|
Type type = 2;
|
||||||
|
int32 balance_cents = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetTransactionsRequest {
|
||||||
|
uint64 account_number = 1;
|
||||||
|
google.protobuf.Timestamp start = 2;
|
||||||
|
google.protobuf.Timestamp end = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Transaction {
|
||||||
|
uint64 account_number = 1;
|
||||||
|
uint64 seq_number = 2;
|
||||||
|
google.protobuf.Timestamp date = 3;
|
||||||
|
int32 amount_cents = 4;
|
||||||
|
string desc = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message DepositRequest {
|
||||||
|
uint64 account_number = 1;
|
||||||
|
int32 amount_cents = 2;
|
||||||
|
enum Source {
|
||||||
|
UNKNOWN = 0;
|
||||||
|
CASH = 1;
|
||||||
|
CHECK = 2;
|
||||||
|
ACH = 3;
|
||||||
|
WIRE = 4;
|
||||||
|
}
|
||||||
|
Source source = 3;
|
||||||
|
string desc = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message BalanceResponse {
|
||||||
|
uint64 account_number = 1;
|
||||||
|
int32 balance_cents = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message WithdrawRequest {
|
||||||
|
uint64 account_number = 1;
|
||||||
|
int32 amount_cents = 2;
|
||||||
|
string desc = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message TransferRequest {
|
||||||
|
message ExternalAccount {
|
||||||
|
uint64 ach_routing_number = 1;
|
||||||
|
uint64 ach_account_number = 2;
|
||||||
|
}
|
||||||
|
oneof source {
|
||||||
|
uint64 source_account_number = 1;
|
||||||
|
ExternalAccount external_source = 2;
|
||||||
|
}
|
||||||
|
oneof dest {
|
||||||
|
uint64 dest_account_number = 3;
|
||||||
|
ExternalAccount external_dest = 4;
|
||||||
|
}
|
||||||
|
int32 amount_cents = 5;
|
||||||
|
string desc = 6;
|
||||||
|
}
|
||||||
|
|
||||||
|
message TransferResponse {
|
||||||
|
uint64 src_account_number = 1;
|
||||||
|
int32 src_balance_cents = 2;
|
||||||
|
uint64 dest_account_number = 3;
|
||||||
|
int32 dest_balance_cents = 4;
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,465 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/ptypes"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// chatServer implements the Support gRPC service, for providing
|
||||||
|
// a capability to connect customers and support agents in real-time
|
||||||
|
// chat.
|
||||||
|
type chatServer struct {
|
||||||
|
chatsBySession map[string]*session
|
||||||
|
chatsAwaitingAgent []string
|
||||||
|
lastSession int32
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type session struct {
|
||||||
|
Session
|
||||||
|
active bool
|
||||||
|
cust *listener
|
||||||
|
agents map[string]*listener
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type listener struct {
|
||||||
|
ch chan<- *ChatEntry
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *listener) send(e *ChatEntry) {
|
||||||
|
select {
|
||||||
|
case l.ch <- e:
|
||||||
|
case <-l.ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *session) copySession() *Session {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return &Session{
|
||||||
|
SessionId: s.SessionId,
|
||||||
|
CustomerName: s.Session.CustomerName,
|
||||||
|
History: s.Session.History,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chatServer) ChatCustomer(stream Support_ChatCustomerServer) error {
|
||||||
|
ctx, cancel := context.WithCancel(stream.Context())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
cust := getCustomer(ctx)
|
||||||
|
if cust == "" {
|
||||||
|
return status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
var sess *session
|
||||||
|
var ch chan *ChatEntry
|
||||||
|
var chCancel context.CancelFunc
|
||||||
|
cleanup := func() {
|
||||||
|
if sess != nil {
|
||||||
|
sess.mu.Lock()
|
||||||
|
sess.cust = nil
|
||||||
|
sess.mu.Unlock()
|
||||||
|
chCancel()
|
||||||
|
close(ch)
|
||||||
|
go func() {
|
||||||
|
// drain channel to prevent deadlock
|
||||||
|
for range ch {
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer cleanup()
|
||||||
|
for {
|
||||||
|
req, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch req := req.Req.(type) {
|
||||||
|
case *ChatCustomerRequest_Init:
|
||||||
|
if sess != nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "already called init, currently in chat session %q", sess.SessionId)
|
||||||
|
}
|
||||||
|
sessionID := req.Init.ResumeSessionId
|
||||||
|
if sessionID == "" {
|
||||||
|
sess, ch, chCancel = s.newSession(ctx, cust)
|
||||||
|
} else if sess, ch, chCancel = s.resumeSession(ctx, cust, sessionID); sess == nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "cannot resume session %q; it is not an open session", sessionID)
|
||||||
|
}
|
||||||
|
err := stream.Send(&ChatCustomerResponse{
|
||||||
|
Resp: &ChatCustomerResponse_Session{
|
||||||
|
Session: sess.copySession(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// monitor the returned channel, sending incoming agent messages down the pipe
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case entry, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if e, ok := entry.Entry.(*ChatEntry_AgentMsg); ok {
|
||||||
|
stream.Send(&ChatCustomerResponse{
|
||||||
|
Resp: &ChatCustomerResponse_Msg{
|
||||||
|
Msg: e.AgentMsg,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
case *ChatCustomerRequest_Msg:
|
||||||
|
if sess == nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "never called init, no chat session for message")
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := &ChatEntry{
|
||||||
|
Date: ptypes.TimestampNow(),
|
||||||
|
Entry: &ChatEntry_CustomerMsg{
|
||||||
|
CustomerMsg: req.Msg,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
func() {
|
||||||
|
sess.mu.Lock()
|
||||||
|
sess.Session.History = append(sess.Session.History, entry)
|
||||||
|
sess.mu.Unlock()
|
||||||
|
|
||||||
|
sess.mu.RLock()
|
||||||
|
defer sess.mu.RUnlock()
|
||||||
|
for _, l := range sess.agents {
|
||||||
|
l.send(entry)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
case *ChatCustomerRequest_HangUp:
|
||||||
|
if sess == nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "never called init, no chat session to hang up")
|
||||||
|
}
|
||||||
|
s.closeSession(sess)
|
||||||
|
cleanup()
|
||||||
|
sess = nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return status.Error(codes.InvalidArgument, "unknown request type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chatServer) ChatAgent(stream Support_ChatAgentServer) error {
|
||||||
|
ctx, cancel := context.WithCancel(stream.Context())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
agent := getAgent(ctx)
|
||||||
|
if agent == "" {
|
||||||
|
return status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
var sess *session
|
||||||
|
var ch chan *ChatEntry
|
||||||
|
var chCancel context.CancelFunc
|
||||||
|
cleanup := func() {
|
||||||
|
if sess != nil {
|
||||||
|
sess.mu.Lock()
|
||||||
|
delete(sess.agents, agent)
|
||||||
|
if len(sess.agents) == 0 {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent, sess.SessionId)
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
sess.mu.Unlock()
|
||||||
|
chCancel()
|
||||||
|
close(ch)
|
||||||
|
go func() {
|
||||||
|
// drain channel to prevent deadlock
|
||||||
|
for range ch {
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
checkSession := func() {
|
||||||
|
// see if session was concurrently closed
|
||||||
|
if sess != nil {
|
||||||
|
sess.mu.RLock()
|
||||||
|
active := sess.active
|
||||||
|
sess.mu.RUnlock()
|
||||||
|
if !active {
|
||||||
|
cleanup()
|
||||||
|
sess = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
req, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
checkSession()
|
||||||
|
|
||||||
|
switch req := req.Req.(type) {
|
||||||
|
case *ChatAgentRequest_Accept:
|
||||||
|
if sess != nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "already called accept, currently in chat session %q", sess.SessionId)
|
||||||
|
}
|
||||||
|
sess, ch, chCancel = s.acceptSession(ctx, agent, req.Accept.SessionId)
|
||||||
|
if sess == nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "no session to accept")
|
||||||
|
}
|
||||||
|
err := stream.Send(&ChatAgentResponse{
|
||||||
|
Resp: &ChatAgentResponse_AcceptedSession{
|
||||||
|
AcceptedSession: sess.copySession(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// monitor the returned channel, sending incoming agent messages down the pipe
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case entry, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry == nil {
|
||||||
|
stream.Send(&ChatAgentResponse{
|
||||||
|
Resp: &ChatAgentResponse_SessionEnded{
|
||||||
|
SessionEnded: Void_VOID,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if agentMsg, ok := entry.Entry.(*ChatEntry_AgentMsg); ok {
|
||||||
|
if agentMsg.AgentMsg.AgentName == agent {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stream.Send(&ChatAgentResponse{
|
||||||
|
Resp: &ChatAgentResponse_Msg{
|
||||||
|
Msg: entry,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
case *ChatAgentRequest_Msg:
|
||||||
|
if sess == nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "never called accept, no chat session for message")
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := &ChatEntry{
|
||||||
|
Date: ptypes.TimestampNow(),
|
||||||
|
Entry: &ChatEntry_AgentMsg{
|
||||||
|
AgentMsg: &AgentMessage{
|
||||||
|
AgentName: agent,
|
||||||
|
Msg: req.Msg,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
active := true
|
||||||
|
func() {
|
||||||
|
sess.mu.Lock()
|
||||||
|
active = sess.active
|
||||||
|
if active {
|
||||||
|
sess.Session.History = append(sess.Session.History, entry)
|
||||||
|
}
|
||||||
|
sess.mu.Unlock()
|
||||||
|
|
||||||
|
if !active {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sess.mu.RLock()
|
||||||
|
defer sess.mu.RUnlock()
|
||||||
|
if sess.cust != nil {
|
||||||
|
sess.cust.send(entry)
|
||||||
|
}
|
||||||
|
for otherAgent, l := range sess.agents {
|
||||||
|
if otherAgent == agent {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
l.send(entry)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if !active {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "customer hung up on chat session %s", sess.SessionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
case *ChatAgentRequest_LeaveSession:
|
||||||
|
if sess == nil {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "never called init, no chat session to hang up")
|
||||||
|
}
|
||||||
|
s.closeSession(sess)
|
||||||
|
cleanup()
|
||||||
|
sess = nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return status.Error(codes.InvalidArgument, "unknown request type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chatServer) newSession(ctx context.Context, cust string) (*session, chan *ChatEntry, context.CancelFunc) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.lastSession++
|
||||||
|
id := fmt.Sprintf("%06d", s.lastSession)
|
||||||
|
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent, id)
|
||||||
|
|
||||||
|
ch := make(chan *ChatEntry, 1)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
l := &listener{
|
||||||
|
ch: ch,
|
||||||
|
ctx: ctx,
|
||||||
|
}
|
||||||
|
sess := session{
|
||||||
|
active: true,
|
||||||
|
Session: Session{
|
||||||
|
SessionId: id,
|
||||||
|
CustomerName: cust,
|
||||||
|
},
|
||||||
|
cust: l,
|
||||||
|
}
|
||||||
|
s.chatsBySession[id] = &sess
|
||||||
|
|
||||||
|
return &sess, ch, cancel
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chatServer) resumeSession(ctx context.Context, cust, sessionID string) (*session, chan *ChatEntry, context.CancelFunc) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
sess := s.chatsBySession[sessionID]
|
||||||
|
if sess.CustomerName != cust {
|
||||||
|
// customer cannot join chat that they did not start
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
if !sess.active {
|
||||||
|
// chat has been closed
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
if sess.cust != nil {
|
||||||
|
// customer is active in the chat in another stream!
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan *ChatEntry, 1)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
l := &listener{
|
||||||
|
ch: ch,
|
||||||
|
ctx: ctx,
|
||||||
|
}
|
||||||
|
sess.cust = l
|
||||||
|
return sess, ch, cancel
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chatServer) closeSession(sess *session) {
|
||||||
|
active := true
|
||||||
|
func() {
|
||||||
|
sess.mu.Lock()
|
||||||
|
active = sess.active
|
||||||
|
sess.active = false
|
||||||
|
sess.mu.Unlock()
|
||||||
|
|
||||||
|
if !active {
|
||||||
|
// already closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sess.mu.RLock()
|
||||||
|
defer sess.mu.RUnlock()
|
||||||
|
for _, l := range sess.agents {
|
||||||
|
l.send(nil)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !active {
|
||||||
|
// already closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
delete(s.chatsBySession, sess.SessionId)
|
||||||
|
for i, id := range s.chatsAwaitingAgent {
|
||||||
|
if id == sess.SessionId {
|
||||||
|
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent[:i], s.chatsAwaitingAgent[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chatServer) acceptSession(ctx context.Context, agent, sessionID string) (*session, chan *ChatEntry, context.CancelFunc) {
|
||||||
|
var sess *session
|
||||||
|
func() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if len(s.chatsAwaitingAgent) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if sessionID == "" {
|
||||||
|
sessionID = s.chatsAwaitingAgent[0]
|
||||||
|
s.chatsAwaitingAgent = s.chatsAwaitingAgent[1:]
|
||||||
|
} else {
|
||||||
|
found := false
|
||||||
|
for i, id := range s.chatsAwaitingAgent {
|
||||||
|
if id == sessionID {
|
||||||
|
found = true
|
||||||
|
s.chatsAwaitingAgent = append(s.chatsAwaitingAgent[:i], s.chatsAwaitingAgent[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sess = s.chatsBySession[sessionID]
|
||||||
|
}()
|
||||||
|
if sess == nil {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
ch := make(chan *ChatEntry, 1)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
l := &listener{
|
||||||
|
ch: ch,
|
||||||
|
ctx: ctx,
|
||||||
|
}
|
||||||
|
sess.mu.Lock()
|
||||||
|
if sess.agents == nil {
|
||||||
|
sess.agents = map[string]*listener{}
|
||||||
|
}
|
||||||
|
sess.agents[agent] = l
|
||||||
|
sess.mu.Unlock()
|
||||||
|
return sess, ch, cancel
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,189 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/jsonpb"
|
||||||
|
"github.com/golang/protobuf/ptypes"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// In-memory database that is periodically saved to a JSON file.
|
||||||
|
|
||||||
|
type accounts struct {
|
||||||
|
AccountNumbersByCustomer map[string][]uint64
|
||||||
|
AccountsByNumber map[uint64]*account
|
||||||
|
AccountNumbers []uint64
|
||||||
|
Customers []string
|
||||||
|
LastAccountNum uint64
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *accounts) openAccount(customer string, accountType Account_Type, initialBalanceCents int32) *Account {
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
|
||||||
|
accountNums, ok := a.AccountNumbersByCustomer[customer]
|
||||||
|
if !ok {
|
||||||
|
// no accounts for this customer? it's a new customer
|
||||||
|
a.Customers = append(a.Customers, customer)
|
||||||
|
}
|
||||||
|
num := a.LastAccountNum + 1
|
||||||
|
a.LastAccountNum = num
|
||||||
|
a.AccountNumbers = append(a.AccountNumbers, num)
|
||||||
|
accountNums = append(accountNums, num)
|
||||||
|
a.AccountNumbersByCustomer[customer] = accountNums
|
||||||
|
var acct account
|
||||||
|
acct.AccountNumber = num
|
||||||
|
acct.BalanceCents = initialBalanceCents
|
||||||
|
acct.Transactions = append(acct.Transactions, &Transaction{
|
||||||
|
AccountNumber: num,
|
||||||
|
SeqNumber: 1,
|
||||||
|
Date: ptypes.TimestampNow(),
|
||||||
|
AmountCents: initialBalanceCents,
|
||||||
|
Desc: "initial deposit",
|
||||||
|
})
|
||||||
|
a.AccountsByNumber[num] = &acct
|
||||||
|
return &acct.Account
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *accounts) closeAccount(customer string, accountNumber uint64) error {
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
|
||||||
|
acctNums := a.AccountNumbersByCustomer[customer]
|
||||||
|
found := -1
|
||||||
|
for i, num := range acctNums {
|
||||||
|
if num == accountNumber {
|
||||||
|
found = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if found == -1 {
|
||||||
|
return status.Errorf(codes.NotFound, "you have no account numbered %d", accountNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
acct := a.AccountsByNumber[accountNumber]
|
||||||
|
if acct.BalanceCents != 0 {
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "account %d cannot be closed because it has a non-zero balance: %s", accountNumber, dollars(acct.BalanceCents))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, num := range a.AccountNumbers {
|
||||||
|
if num == accountNumber {
|
||||||
|
a.AccountNumbers = append(a.AccountNumbers[:i], a.AccountNumbers[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.AccountNumbersByCustomer[customer] = append(acctNums[:found], acctNums[found+1:]...)
|
||||||
|
delete(a.AccountsByNumber, accountNumber)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *accounts) getAccount(customer string, accountNumber uint64) (*account, error) {
|
||||||
|
a.mu.RLock()
|
||||||
|
defer a.mu.RUnlock()
|
||||||
|
acctNums := a.AccountNumbersByCustomer[customer]
|
||||||
|
for _, num := range acctNums {
|
||||||
|
if num == accountNumber {
|
||||||
|
return a.AccountsByNumber[num], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", accountNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *accounts) getAllAccounts(customer string) []*Account {
|
||||||
|
a.mu.RLock()
|
||||||
|
defer a.mu.RUnlock()
|
||||||
|
|
||||||
|
accountNums := a.AccountNumbersByCustomer[customer]
|
||||||
|
var accounts []*Account
|
||||||
|
for _, num := range accountNums {
|
||||||
|
accounts = append(accounts, &a.AccountsByNumber[num].Account)
|
||||||
|
}
|
||||||
|
return accounts
|
||||||
|
}
|
||||||
|
|
||||||
|
type account struct {
|
||||||
|
Account
|
||||||
|
Transactions []*Transaction
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *account) getTransactions() []*Transaction {
|
||||||
|
a.mu.RLock()
|
||||||
|
defer a.mu.RUnlock()
|
||||||
|
return a.Transactions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *account) newTransaction(amountCents int32, desc string) (newBalance int32, err error) {
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
bal := a.BalanceCents + amountCents
|
||||||
|
if bal < 0 {
|
||||||
|
return 0, status.Errorf(codes.FailedPrecondition, "insufficient funds: cannot withdraw %s when balance is %s", dollars(amountCents), dollars(a.BalanceCents))
|
||||||
|
}
|
||||||
|
a.BalanceCents = bal
|
||||||
|
a.Transactions = append(a.Transactions, &Transaction{
|
||||||
|
AccountNumber: a.AccountNumber,
|
||||||
|
Date: ptypes.TimestampNow(),
|
||||||
|
AmountCents: amountCents,
|
||||||
|
SeqNumber: uint64(len(a.Transactions) + 1),
|
||||||
|
Desc: desc,
|
||||||
|
})
|
||||||
|
return bal, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transaction) MarshalJSON() ([]byte, error) {
|
||||||
|
var jsm jsonpb.Marshaler
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := jsm.Marshal(&buf, t); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return buf.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Transaction) UnmarshalJSON(b []byte) error {
|
||||||
|
return jsonpb.Unmarshal(bytes.NewReader(b), t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *accounts) clone() *accounts {
|
||||||
|
var clone accounts
|
||||||
|
clone.AccountNumbersByCustomer = map[string][]uint64{}
|
||||||
|
clone.AccountsByNumber = map[uint64]*account{}
|
||||||
|
|
||||||
|
a.mu.RLock()
|
||||||
|
clone.Customers = a.Customers
|
||||||
|
a.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, cust := range clone.Customers {
|
||||||
|
var acctNums []uint64
|
||||||
|
a.mu.RLock()
|
||||||
|
acctNums = a.AccountNumbersByCustomer[cust]
|
||||||
|
a.mu.RUnlock()
|
||||||
|
|
||||||
|
clone.AccountNumbersByCustomer[cust] = acctNums
|
||||||
|
clone.AccountNumbers = append(clone.AccountNumbers, acctNums...)
|
||||||
|
|
||||||
|
for _, acctNum := range acctNums {
|
||||||
|
a.mu.RLock()
|
||||||
|
acct := a.AccountsByNumber[acctNum]
|
||||||
|
a.mu.RUnlock()
|
||||||
|
|
||||||
|
acct.mu.RLock()
|
||||||
|
txns := acct.Transactions
|
||||||
|
acct.mu.RUnlock()
|
||||||
|
|
||||||
|
clone.AccountsByNumber[acctNum] = &account{Transactions: txns}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &clone
|
||||||
|
}
|
||||||
|
|
||||||
|
func dollars(amountCents int32) string {
|
||||||
|
return fmt.Sprintf("$%02f", float64(amountCents)/100)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,168 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
//go:generate protoc --go_out=plugins=grpc:./ bank.proto support.proto
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"sync/atomic"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
|
"google.golang.org/grpc/peer"
|
||||||
|
"google.golang.org/grpc/reflection"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stderr))
|
||||||
|
|
||||||
|
port := flag.Int("port", 12345, "The port on which bankdemo gRPC server will listen.")
|
||||||
|
datafile := flag.String("datafile", "accounts.json", "The path and filename to which bank account data is saved and from which data will be loaded.")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// create the server and load initial dataset
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
s := &svr{
|
||||||
|
datafile: *datafile,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
if err := s.load(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *port))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
grpcSvr := gRPCServer()
|
||||||
|
|
||||||
|
// Register gRPC service implementations
|
||||||
|
bankSvc := bankServer{
|
||||||
|
allAccounts: &s.allAccounts,
|
||||||
|
}
|
||||||
|
RegisterBankServer(grpcSvr, &bankSvc)
|
||||||
|
|
||||||
|
chatSvc := chatServer{
|
||||||
|
chatsBySession: map[string]*session{},
|
||||||
|
}
|
||||||
|
RegisterSupportServer(grpcSvr, &chatSvc)
|
||||||
|
|
||||||
|
go s.bgSaver()
|
||||||
|
|
||||||
|
// don't forget to include server reflection support!
|
||||||
|
reflection.Register(grpcSvr)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
s.flush()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// trap SIGINT / SIGTERM to exit cleanly
|
||||||
|
c := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(c, syscall.SIGINT)
|
||||||
|
signal.Notify(c, syscall.SIGTERM)
|
||||||
|
go func() {
|
||||||
|
<-c
|
||||||
|
fmt.Println("Shutting down...")
|
||||||
|
grpcSvr.GracefulStop()
|
||||||
|
}()
|
||||||
|
|
||||||
|
grpclog.Infof("server starting, listening on %v", l.Addr())
|
||||||
|
if err := grpcSvr.Serve(l); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func gRPCServer() *grpc.Server {
|
||||||
|
var reqCounter uint64
|
||||||
|
return grpc.NewServer(
|
||||||
|
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
|
reqID := atomic.AddUint64(&reqCounter, 1)
|
||||||
|
var client string
|
||||||
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
|
client = p.Addr.String()
|
||||||
|
} else {
|
||||||
|
client = "?"
|
||||||
|
}
|
||||||
|
grpclog.Infof("request %d started for %s from %s", reqID, info.FullMethod, client)
|
||||||
|
|
||||||
|
rsp, err := handler(ctx, req)
|
||||||
|
|
||||||
|
stat, _ := status.FromError(err)
|
||||||
|
grpclog.Infof("request %d completed for %s from %s: %v %s", reqID, info.FullMethod, client, stat.Code(), stat.Message())
|
||||||
|
return rsp, err
|
||||||
|
|
||||||
|
}),
|
||||||
|
grpc.StreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||||
|
reqID := atomic.AddUint64(&reqCounter, 1)
|
||||||
|
var client string
|
||||||
|
if p, ok := peer.FromContext(ss.Context()); ok {
|
||||||
|
client = p.Addr.String()
|
||||||
|
} else {
|
||||||
|
client = "?"
|
||||||
|
}
|
||||||
|
grpclog.Infof("request %d started for %s from %s", reqID, info.FullMethod, client)
|
||||||
|
|
||||||
|
err := handler(srv, ss)
|
||||||
|
|
||||||
|
stat, _ := status.FromError(err)
|
||||||
|
grpclog.Infof("request %d completed for %s from %s: %v %s", reqID, info.FullMethod, client, stat.Code(), stat.Message())
|
||||||
|
return err
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
type svr struct {
|
||||||
|
datafile string
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
allAccounts accounts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *svr) load() error {
|
||||||
|
accts, err := ioutil.ReadFile(s.datafile)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(accts) == 0 {
|
||||||
|
s.allAccounts.AccountNumbersByCustomer = map[string][]uint64{}
|
||||||
|
s.allAccounts.AccountsByNumber = map[uint64]*account{}
|
||||||
|
} else if err := json.Unmarshal(accts, &s.allAccounts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *svr) bgSaver() {
|
||||||
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
s.flush()
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *svr) flush() {
|
||||||
|
accounts := s.allAccounts.clone()
|
||||||
|
|
||||||
|
if b, err := json.Marshal(accounts); err != nil {
|
||||||
|
grpclog.Errorf("failed to save data to %q", s.datafile)
|
||||||
|
} else if err := ioutil.WriteFile(s.datafile, b, 0666); err != nil {
|
||||||
|
grpclog.Errorf("failed to save data to %q", s.datafile)
|
||||||
|
}
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,118 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option go_package = "main";
|
||||||
|
|
||||||
|
import "google/protobuf/timestamp.proto";
|
||||||
|
|
||||||
|
// Support provides an interactive chat service, for customers to interact with
|
||||||
|
// the bank's support agents. A single stream, for either of the two methods, is
|
||||||
|
// a stateful connection to a single "chat session". Streams are initially disconnected
|
||||||
|
// (not part of any session). A stream must be disconnected from a session (via customer
|
||||||
|
// hang up or via agent leaving a session) before it can be connected to a new one.
|
||||||
|
service Support {
|
||||||
|
// ChatCustomer is used by a customer-facing app to send the customer's messages
|
||||||
|
// to a chat session. The customer is how initiates and terminates (via "hangup")
|
||||||
|
// a chat session. Only customers may invoke this method (e.g. requests must
|
||||||
|
// include customer auth credentials).
|
||||||
|
rpc ChatCustomer(stream ChatCustomerRequest) returns (stream ChatCustomerResponse);
|
||||||
|
// ChatAgent is used by an agent-facing app to allow an agent to reply to a
|
||||||
|
// customer's messages in a chat session. The agent may accept a chat session,
|
||||||
|
// which defaults to the session awaiting an agent for the longest period of time
|
||||||
|
// (FIFO queue).
|
||||||
|
rpc ChatAgent(stream ChatAgentRequest) returns (stream ChatAgentResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Void {
|
||||||
|
VOID = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ChatCustomerRequest {
|
||||||
|
oneof req {
|
||||||
|
// init is used when a chat stream is not part of a
|
||||||
|
// chat session. This is a stream's initial state, as well as
|
||||||
|
// the state after a "hang_up" request is sent. This creates
|
||||||
|
// a new state session or resumes an existing one.
|
||||||
|
InitiateChat init = 1;
|
||||||
|
// msg is used to send the customer's messages to support
|
||||||
|
// agents.
|
||||||
|
string msg = 2;
|
||||||
|
// hang_up is used to terminate a chat session. If a stream
|
||||||
|
// is broken, but the session was not terminated, the client
|
||||||
|
// may initiate a new stream and use init to resume that
|
||||||
|
// session. Sessions are not terminated unless done so
|
||||||
|
// explicitly via sending this kind of request on the stream.
|
||||||
|
Void hang_up = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message InitiateChat {
|
||||||
|
string resume_session_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message AgentMessage {
|
||||||
|
string agent_name = 1;
|
||||||
|
string msg = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ChatCustomerResponse {
|
||||||
|
oneof resp {
|
||||||
|
// session is sent from the server when the stream is connected
|
||||||
|
// to a chat session. This happens after an init request is sent
|
||||||
|
// and the stream is connected to either a new or resumed session.
|
||||||
|
Session session = 1;
|
||||||
|
// msg is sent from the server to convey agents' messages to the
|
||||||
|
// customer.
|
||||||
|
AgentMessage msg = 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message ChatAgentRequest {
|
||||||
|
oneof req {
|
||||||
|
// accept is used when an agent wants to join a customer chat
|
||||||
|
// session. It can be used to connect to a specific session (by
|
||||||
|
// ID), or to just accept the session for which the customer has
|
||||||
|
// been waiting the longest (e.g. poll a FIFO queue of sessions
|
||||||
|
// awaiting a support agent). It is possible for multiple agents
|
||||||
|
// to be connected to the same chat session.
|
||||||
|
AcceptChat accept = 1;
|
||||||
|
// msg is used to send a message to the customer. It will also be
|
||||||
|
// delivered to any other connected support agents.
|
||||||
|
string msg = 2;
|
||||||
|
// leave_session allows an agent to exit a chat session. They can
|
||||||
|
// always re-enter later by sending an accept message for that
|
||||||
|
// session ID.
|
||||||
|
Void leave_session = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message AcceptChat {
|
||||||
|
string session_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ChatEntry {
|
||||||
|
google.protobuf.Timestamp date = 1;
|
||||||
|
oneof entry {
|
||||||
|
string customer_msg = 2;
|
||||||
|
AgentMessage agent_msg = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message ChatAgentResponse {
|
||||||
|
oneof resp {
|
||||||
|
// accepted_session provides the detail of a chat session. The server
|
||||||
|
// sends this message after the agent has accepted a chat session.
|
||||||
|
Session accepted_session = 1;
|
||||||
|
// msg is sent by the server when the customer, or another support
|
||||||
|
// agent, sends a message in stream's current session.
|
||||||
|
ChatEntry msg = 2;
|
||||||
|
// session_ended notifies the support agent that their currently
|
||||||
|
// connected chat session has been terminated by the customer.
|
||||||
|
Void session_ended = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message Session {
|
||||||
|
string session_id = 1;
|
||||||
|
string customer_name = 2;
|
||||||
|
repeated ChatEntry history = 3;
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
# testserver
|
||||||
|
|
||||||
|
The `testserver` program is a simple server that can be used for testing RPC clients such
|
||||||
|
as `grpcurl`. It implements an RPC interface that is defined in `grpcurl`'s [testing package](https://github.com/fullstorydev/grpcurl/blob/master/testing/example.proto) and also exposes [the implementation](https://godoc.org/github.com/fullstorydev/grpcurl/testing#TestServer) that is defined in that same package. This is the same test interface and implementation that is used in unit tests for `grpcurl`.
|
||||||
|
|
||||||
|
For a possibly more interesting test server, take a look at `bankdemo`, which is a demo gRPC app that provides a more concrete RPC interface, including full-duplex bidirectional streaming methods, plus an example implementation.
|
||||||
Loading…
Reference in New Issue