split proto into two; move more logic into db.go to make bank service impl cleaner; add graceful shutdown on signal

This commit is contained in:
Josh Humphries 2018-09-06 20:47:26 -04:00
parent e1a9991958
commit a020c24a83
7 changed files with 1581 additions and 1502 deletions

View File

@ -34,30 +34,7 @@ func (s *bankServer) OpenAccount(ctx context.Context, req *OpenAccountRequest) (
return nil, status.Errorf(codes.InvalidArgument, "invalid account type: %v", req.Type)
}
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
accountNums, ok := s.allAccounts.AccountNumbersByCustomer[cust]
if !ok {
// no accounts for this customer? it's a new customer
s.allAccounts.Customers = append(s.allAccounts.Customers, cust)
}
num := s.allAccounts.LastAccountNum + 1
s.allAccounts.LastAccountNum = num
s.allAccounts.AccountNumbers = append(s.allAccounts.AccountNumbers, num)
accountNums = append(accountNums, num)
s.allAccounts.AccountNumbersByCustomer[cust] = accountNums
var acct account
acct.AccountNumber = num
acct.BalanceCents = req.InitialDepositCents
acct.Transactions = append(acct.Transactions, &Transaction{
AccountNumber: num,
SeqNumber: 1,
Date: ptypes.TimestampNow(),
AmountCents: req.InitialDepositCents,
Desc: "initial deposit",
})
s.allAccounts.AccountsByNumber[num] = &acct
return &acct.Account, nil
return s.allAccounts.openAccount(cust, req.Type, req.InitialDepositCents), nil
}
func (s *bankServer) CloseAccount(ctx context.Context, req *CloseAccountRequest) (*empty.Empty, error) {
@ -66,33 +43,9 @@ func (s *bankServer) CloseAccount(ctx context.Context, req *CloseAccountRequest)
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
acctNums := s.allAccounts.AccountNumbersByCustomer[cust]
found := -1
for i, num := range acctNums {
if num == req.AccountNumber {
found = i
break
}
if err := s.allAccounts.closeAccount(cust, req.AccountNumber); err != nil {
return nil, err
}
if found == -1 {
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", req.AccountNumber)
}
for i, num := range s.allAccounts.AccountNumbers {
if num == req.AccountNumber {
s.allAccounts.AccountNumbers = append(s.allAccounts.AccountNumbers[:i], s.allAccounts.AccountNumbers[i+1:]...)
break
}
}
acct := s.allAccounts.AccountsByNumber[req.AccountNumber]
if acct.BalanceCents != 0 {
return nil, status.Errorf(codes.FailedPrecondition, "account %d cannot be closed because it has a non-zero balance: %s", req.AccountNumber, dollars(acct.BalanceCents))
}
s.allAccounts.AccountNumbersByCustomer[cust] = append(acctNums[:found], acctNums[found+1:]...)
delete(s.allAccounts.AccountsByNumber, req.AccountNumber)
return &empty.Empty{}, nil
}
@ -102,13 +55,7 @@ func (s *bankServer) GetAccounts(ctx context.Context, _ *empty.Empty) (*GetAccou
return nil, status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
s.allAccounts.mu.RLock()
defer s.allAccounts.mu.RUnlock()
accountNums := s.allAccounts.AccountNumbersByCustomer[cust]
var accounts []*Account
for _, num := range accountNums {
accounts = append(accounts, &s.allAccounts.AccountsByNumber[num].Account)
}
accounts := s.allAccounts.getAllAccounts(cust)
return &GetAccountsResponse{Accounts: accounts}, nil
}
@ -118,17 +65,7 @@ func (s *bankServer) GetTransactions(req *GetTransactionsRequest, stream Bank_Ge
return status.Error(codes.Unauthenticated, codes.Unauthenticated.String())
}
acct, err := func() (*account, error) {
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
acctNums := s.allAccounts.AccountNumbersByCustomer[cust]
for _, num := range acctNums {
if num == req.AccountNumber {
return s.allAccounts.AccountsByNumber[num], nil
}
}
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", req.AccountNumber)
}()
acct, err := s.allAccounts.getAccount(cust, req.AccountNumber)
if err != nil {
return err
}
@ -149,10 +86,7 @@ func (s *bankServer) GetTransactions(req *GetTransactionsRequest, stream Bank_Ge
end = time.Date(9999, 12, 31, 23, 59, 59, 999999999, time.Local)
}
acct.mu.RLock()
txns := acct.Transactions
acct.mu.RUnlock()
txns := acct.getTransactions()
for _, txn := range txns {
t, err := ptypes.Timestamp(txn.Date)
if err != nil {
@ -190,7 +124,11 @@ func (s *bankServer) Deposit(ctx context.Context, req *DepositRequest) (*Balance
if req.Desc != "" {
desc = fmt.Sprintf("%s: %s", desc, req.Desc)
}
newBalance, err := s.newTransaction(cust, req.AccountNumber, req.AmountCents, desc)
acct, err := s.allAccounts.getAccount(cust, req.AccountNumber)
if err != nil {
return nil, err
}
newBalance, err := acct.newTransaction(req.AmountCents, desc)
if err != nil {
return nil, err
}
@ -200,26 +138,6 @@ func (s *bankServer) Deposit(ctx context.Context, req *DepositRequest) (*Balance
}, nil
}
func (s *bankServer) getAccount(cust string, acctNumber uint64) (*account, error) {
s.allAccounts.mu.Lock()
defer s.allAccounts.mu.Unlock()
acctNums := s.allAccounts.AccountNumbersByCustomer[cust]
for _, num := range acctNums {
if num == acctNumber {
return s.allAccounts.AccountsByNumber[num], nil
}
}
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", acctNumber)
}
func (s *bankServer) newTransaction(cust string, acctNumber uint64, amountCents int32, desc string) (int32, error) {
acct, err := s.getAccount(cust, acctNumber)
if err != nil {
return 0, err
}
return acct.newTransaction(amountCents, desc)
}
func (s *bankServer) Withdraw(ctx context.Context, req *WithdrawRequest) (*BalanceResponse, error) {
cust := getCustomer(ctx)
if cust == "" {
@ -230,7 +148,11 @@ func (s *bankServer) Withdraw(ctx context.Context, req *WithdrawRequest) (*Balan
return nil, status.Errorf(codes.InvalidArgument, "withdrawal amount cannot be non-negative: %s", dollars(req.AmountCents))
}
newBalance, err := s.newTransaction(cust, req.AccountNumber, req.AmountCents, req.Desc)
acct, err := s.allAccounts.getAccount(cust, req.AccountNumber)
if err != nil {
return nil, err
}
newBalance, err := acct.newTransaction(req.AmountCents, req.Desc)
if err != nil {
return nil, err
}
@ -240,10 +162,6 @@ func (s *bankServer) Withdraw(ctx context.Context, req *WithdrawRequest) (*Balan
}, nil
}
func dollars(amountCents int32) string {
return fmt.Sprintf("$%02f", float64(amountCents)/100)
}
func (s *bankServer) Transfer(ctx context.Context, req *TransferRequest) (*TransferResponse, error) {
cust := getCustomer(ctx)
if cust == "" {
@ -265,7 +183,7 @@ func (s *bankServer) Transfer(ctx context.Context, req *TransferRequest) (*Trans
case *TransferRequest_SourceAccountNumber:
srcDesc = fmt.Sprintf("account %06d", src.SourceAccountNumber)
var err error
if srcAcct, err = s.getAccount(cust, src.SourceAccountNumber); err != nil {
if srcAcct, err = s.allAccounts.getAccount(cust, src.SourceAccountNumber); err != nil {
return nil, err
}
}
@ -281,7 +199,7 @@ func (s *bankServer) Transfer(ctx context.Context, req *TransferRequest) (*Trans
case *TransferRequest_DestAccountNumber:
destDesc = fmt.Sprintf("account %06d", dest.DestAccountNumber)
var err error
if destAcct, err = s.getAccount(cust, dest.DestAccountNumber); err != nil {
if destAcct, err = s.allAccounts.getAccount(cust, dest.DestAccountNumber); err != nil {
return nil, err
}
}

File diff suppressed because it is too large Load Diff

View File

@ -33,24 +33,6 @@ service Bank {
rpc Transfer(TransferRequest) returns (TransferResponse);
}
// Support provides an interactive chat service, for customers to interact with
// the bank's support agents. A single stream, for either of the two methods, is
// a stateful connection to a single "chat session". Streams are initially disconnected
// (not part of any session). A stream must be disconnected from a session (via customer
// hang up or via agent leaving a session) before it can be connected to a new one.
service Support {
// ChatCustomer is used by a customer-facing app to send the customer's messages
// to a chat session. The customer is how initiates and terminates (via "hangup")
// a chat session. Only customers may invoke this method (e.g. requests must
// include customer auth credentials).
rpc ChatCustomer(stream ChatCustomerRequest) returns (stream ChatCustomerResponse);
// ChatAgent is used by an agent-facing app to allow an agent to reply to a
// customer's messages in a chat session. The agent may accept a chat session,
// which defaults to the session awaiting an agent for the longest period of time
// (FIFO queue).
rpc ChatAgent(stream ChatAgentRequest) returns (stream ChatAgentResponse);
}
message OpenAccountRequest {
int32 initial_deposit_cents = 1;
Account.Type type = 2;
@ -141,98 +123,3 @@ message TransferResponse {
uint64 dest_account_number = 3;
int32 dest_balance_cents = 4;
}
enum Void {
VOID = 0;
}
message ChatCustomerRequest {
oneof req {
// init is used when a chat stream is not part of a
// chat session. This is a stream's initial state, as well as
// the state after a "hang_up" request is sent. This creates
// a new state session or resumes an existing one.
InitiateChat init = 1;
// msg is used to send the customer's messages to support
// agents.
string msg = 2;
// hang_up is used to terminate a chat session. If a stream
// is broken, but the session was not terminated, the client
// may initiate a new stream and use init to resume that
// session. Sessions are not terminated unless done so
// explicitly via sending this kind of request on the stream.
Void hang_up = 3;
}
}
message InitiateChat {
string resume_session_id = 1;
}
message AgentMessage {
string agent_name = 1;
string msg = 2;
}
message ChatCustomerResponse {
oneof resp {
// session is sent from the server when the stream is connected
// to a chat session. This happens after an init request is sent
// and the stream is connected to either a new or resumed session.
Session session = 1;
// msg is sent from the server to convey agents' messages to the
// customer.
AgentMessage msg = 2;
}
}
message ChatAgentRequest {
oneof req {
// accept is used when an agent wants to join a customer chat
// session. It can be used to connect to a specific session (by
// ID), or to just accept the session for which the customer has
// been waiting the longest (e.g. poll a FIFO queue of sessions
// awaiting a support agent). It is possible for multiple agents
// to be connected to the same chat session.
AcceptChat accept = 1;
// msg is used to send a message to the customer. It will also be
// delivered to any other connected support agents.
string msg = 2;
// leave_session allows an agent to exit a chat session. They can
// always re-enter later by sending an accept message for that
// session ID.
Void leave_session = 3;
}
}
message AcceptChat {
string session_id = 1;
}
message ChatEntry {
google.protobuf.Timestamp date = 1;
oneof entry {
string customer_msg = 2;
AgentMessage agent_msg = 3;
}
}
message ChatAgentResponse {
oneof resp {
// accepted_session provides the detail of a chat session. The server
// sends this message after the agent has accepted a chat session.
Session accepted_session = 1;
// msg is sent by the server when the customer, or another support
// agent, sends a message in stream's current session.
ChatEntry msg = 2;
// session_ended notifies the support agent that their currently
// connected chat session has been terminated by the customer.
Void session_ended = 3;
}
}
message Session {
string session_id = 1;
string customer_name = 2;
repeated ChatEntry history = 3;
}

View File

@ -2,6 +2,7 @@ package main
import (
"bytes"
"fmt"
"sync"
"github.com/golang/protobuf/jsonpb"
@ -21,20 +22,111 @@ type accounts struct {
mu sync.RWMutex
}
func (a *accounts) openAccount(customer string, accountType Account_Type, initialBalanceCents int32) *Account {
a.mu.Lock()
defer a.mu.Unlock()
accountNums, ok := a.AccountNumbersByCustomer[customer]
if !ok {
// no accounts for this customer? it's a new customer
a.Customers = append(a.Customers, customer)
}
num := a.LastAccountNum + 1
a.LastAccountNum = num
a.AccountNumbers = append(a.AccountNumbers, num)
accountNums = append(accountNums, num)
a.AccountNumbersByCustomer[customer] = accountNums
var acct account
acct.AccountNumber = num
acct.BalanceCents = initialBalanceCents
acct.Transactions = append(acct.Transactions, &Transaction{
AccountNumber: num,
SeqNumber: 1,
Date: ptypes.TimestampNow(),
AmountCents: initialBalanceCents,
Desc: "initial deposit",
})
a.AccountsByNumber[num] = &acct
return &acct.Account
}
func (a *accounts) closeAccount(customer string, accountNumber uint64) error {
a.mu.Lock()
defer a.mu.Unlock()
acctNums := a.AccountNumbersByCustomer[customer]
found := -1
for i, num := range acctNums {
if num == accountNumber {
found = i
break
}
}
if found == -1 {
return status.Errorf(codes.NotFound, "you have no account numbered %d", accountNumber)
}
acct := a.AccountsByNumber[accountNumber]
if acct.BalanceCents != 0 {
return status.Errorf(codes.FailedPrecondition, "account %d cannot be closed because it has a non-zero balance: %s", accountNumber, dollars(acct.BalanceCents))
}
for i, num := range a.AccountNumbers {
if num == accountNumber {
a.AccountNumbers = append(a.AccountNumbers[:i], a.AccountNumbers[i+1:]...)
break
}
}
a.AccountNumbersByCustomer[customer] = append(acctNums[:found], acctNums[found+1:]...)
delete(a.AccountsByNumber, accountNumber)
return nil
}
func (a *accounts) getAccount(customer string, accountNumber uint64) (*account, error) {
a.mu.RLock()
defer a.mu.RUnlock()
acctNums := a.AccountNumbersByCustomer[customer]
for _, num := range acctNums {
if num == accountNumber {
return a.AccountsByNumber[num], nil
}
}
return nil, status.Errorf(codes.NotFound, "you have no account numbered %d", accountNumber)
}
func (a *accounts) getAllAccounts(customer string) []*Account {
a.mu.RLock()
defer a.mu.RUnlock()
accountNums := a.AccountNumbersByCustomer[customer]
var accounts []*Account
for _, num := range accountNums {
accounts = append(accounts, &a.AccountsByNumber[num].Account)
}
return accounts
}
type account struct {
Account
Transactions []*Transaction
mu sync.RWMutex
}
func (a *account) newTransaction(amountCents int32, desc string) (int32, error) {
func (a *account) getTransactions() []*Transaction {
a.mu.RLock()
defer a.mu.RUnlock()
return a.Transactions
}
func (a *account) newTransaction(amountCents int32, desc string) (newBalance int32, err error) {
a.mu.Lock()
defer a.mu.Unlock()
newBalance := a.BalanceCents + amountCents
if newBalance < 0 {
bal := a.BalanceCents + amountCents
if bal < 0 {
return 0, status.Errorf(codes.FailedPrecondition, "insufficient funds: cannot withdraw %s when balance is %s", dollars(amountCents), dollars(a.BalanceCents))
}
a.BalanceCents += amountCents
a.BalanceCents = bal
a.Transactions = append(a.Transactions, &Transaction{
AccountNumber: a.AccountNumber,
Date: ptypes.TimestampNow(),
@ -42,7 +134,7 @@ func (a *account) newTransaction(amountCents int32, desc string) (int32, error)
SeqNumber: uint64(len(a.Transactions) + 1),
Desc: desc,
})
return a.BalanceCents, nil
return bal, nil
}
func (t *Transaction) MarshalJSON() ([]byte, error) {
@ -58,7 +150,7 @@ func (t *Transaction) UnmarshalJSON(b []byte) error {
return jsonpb.Unmarshal(bytes.NewReader(b), t)
}
func (a *accounts) Clone() *accounts {
func (a *accounts) clone() *accounts {
var clone accounts
clone.AccountNumbersByCustomer = map[string][]uint64{}
clone.AccountsByNumber = map[uint64]*account{}
@ -91,3 +183,7 @@ func (a *accounts) Clone() *accounts {
return &clone
}
func dollars(amountCents int32) string {
return fmt.Sprintf("$%02f", float64(amountCents)/100)
}

View File

@ -1,6 +1,6 @@
package main
//go:generate protoc --go_out=plugins=grpc:./ bank.proto
//go:generate protoc --go_out=plugins=grpc:./ bank.proto support.proto
import (
"encoding/json"
@ -9,8 +9,9 @@ import (
"io/ioutil"
"net"
"os"
"sync"
"os/signal"
"sync/atomic"
"syscall"
"time"
"golang.org/x/net/context"
@ -67,6 +68,16 @@ func main() {
s.flush()
}()
// trap SIGINT / SIGTERM to exit cleanly
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
fmt.Println("Shutting down...")
grpcSvr.GracefulStop()
}()
grpclog.Infof("server starting, listening on %v", l.Addr())
if err := grpcSvr.Serve(l); err != nil {
panic(err)
@ -112,11 +123,9 @@ func gRPCServer() *grpc.Server {
}
type svr struct {
datafile string
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
datafile string
ctx context.Context
cancel context.CancelFunc
allAccounts accounts
}
@ -149,10 +158,9 @@ func (s *svr) bgSaver() {
}
func (s *svr) flush() {
s.mu.Lock()
defer s.mu.Unlock()
accounts := s.allAccounts.clone()
if b, err := json.Marshal(&s.allAccounts); err != nil {
if b, err := json.Marshal(accounts); err != nil {
grpclog.Errorf("failed to save data to %q", s.datafile)
} else if err := ioutil.WriteFile(s.datafile, b, 0666); err != nil {
grpclog.Errorf("failed to save data to %q", s.datafile)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,118 @@
syntax = "proto3";
option go_package = "main";
import "google/protobuf/timestamp.proto";
// Support provides an interactive chat service, for customers to interact with
// the bank's support agents. A single stream, for either of the two methods, is
// a stateful connection to a single "chat session". Streams are initially disconnected
// (not part of any session). A stream must be disconnected from a session (via customer
// hang up or via agent leaving a session) before it can be connected to a new one.
service Support {
// ChatCustomer is used by a customer-facing app to send the customer's messages
// to a chat session. The customer is how initiates and terminates (via "hangup")
// a chat session. Only customers may invoke this method (e.g. requests must
// include customer auth credentials).
rpc ChatCustomer(stream ChatCustomerRequest) returns (stream ChatCustomerResponse);
// ChatAgent is used by an agent-facing app to allow an agent to reply to a
// customer's messages in a chat session. The agent may accept a chat session,
// which defaults to the session awaiting an agent for the longest period of time
// (FIFO queue).
rpc ChatAgent(stream ChatAgentRequest) returns (stream ChatAgentResponse);
}
enum Void {
VOID = 0;
}
message ChatCustomerRequest {
oneof req {
// init is used when a chat stream is not part of a
// chat session. This is a stream's initial state, as well as
// the state after a "hang_up" request is sent. This creates
// a new state session or resumes an existing one.
InitiateChat init = 1;
// msg is used to send the customer's messages to support
// agents.
string msg = 2;
// hang_up is used to terminate a chat session. If a stream
// is broken, but the session was not terminated, the client
// may initiate a new stream and use init to resume that
// session. Sessions are not terminated unless done so
// explicitly via sending this kind of request on the stream.
Void hang_up = 3;
}
}
message InitiateChat {
string resume_session_id = 1;
}
message AgentMessage {
string agent_name = 1;
string msg = 2;
}
message ChatCustomerResponse {
oneof resp {
// session is sent from the server when the stream is connected
// to a chat session. This happens after an init request is sent
// and the stream is connected to either a new or resumed session.
Session session = 1;
// msg is sent from the server to convey agents' messages to the
// customer.
AgentMessage msg = 2;
}
}
message ChatAgentRequest {
oneof req {
// accept is used when an agent wants to join a customer chat
// session. It can be used to connect to a specific session (by
// ID), or to just accept the session for which the customer has
// been waiting the longest (e.g. poll a FIFO queue of sessions
// awaiting a support agent). It is possible for multiple agents
// to be connected to the same chat session.
AcceptChat accept = 1;
// msg is used to send a message to the customer. It will also be
// delivered to any other connected support agents.
string msg = 2;
// leave_session allows an agent to exit a chat session. They can
// always re-enter later by sending an accept message for that
// session ID.
Void leave_session = 3;
}
}
message AcceptChat {
string session_id = 1;
}
message ChatEntry {
google.protobuf.Timestamp date = 1;
oneof entry {
string customer_msg = 2;
AgentMessage agent_msg = 3;
}
}
message ChatAgentResponse {
oneof resp {
// accepted_session provides the detail of a chat session. The server
// sends this message after the agent has accepted a chat session.
Session accepted_session = 1;
// msg is sent by the server when the customer, or another support
// agent, sends a message in stream's current session.
ChatEntry msg = 2;
// session_ended notifies the support agent that their currently
// connected chat session has been terminated by the customer.
Void session_ended = 3;
}
}
message Session {
string session_id = 1;
string customer_name = 2;
repeated ChatEntry history = 3;
}