效果
使用gRPC一元通信模式和双向流通信模式写一个简单的控制台聊天室。实现创建用户和实时聊天两个功能,不考虑高性能。复习了内存同步访问Sync包的使用。用切片缓存聊天记录,新用户可以同步聊天记录。
PS C:\Users\小能喵喵喵\Desktop\Go\gRPC\chatroom> tree /f
├───client
│ │ go.mod
│ │ go.sum
│ │ main.go
│ │
│ └───chatroom
│ chat_room.pb.go
│ chat_room_grpc.pb.go
│
├───proto
│ │ chat_room.pb.go
│ │ chat_room.proto
│ │ chat_room_grpc.pb.go
│ │
│ └───google
│ └───protobuf
│ wrappers.proto
│
└───server
│ go.mod
│ go.sum
│ main.go
│ service.go
│
└───chatroom
chat_room.pb.go
chat_room_grpc.pb.go
server.code-workspace
Proto
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package chatroom;
option go_package=".";
service ChatRoom{
rpc login(User) returns(google.protobuf.StringValue);
rpc chat(stream ChatMessage) returns(stream ChatMessage);
}
message User{
string id = 1;
string name = 2;
}
message ChatMessage{
string id = 1;
string name = 2;
uint64 time = 3;
string content = 4;
}
protoc --go_out=. --go-grpc_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative ./chat_room.proto
Server
service.go
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
pb "wolflong.com/chatroom_server/chatroom"
)
// ^ 实现服务
type service struct {
pb.UnimplementedChatRoomServer
chatMessageCache []*pb.ChatMessage
userMap sync.Map
L sync.RWMutex
}
var (
workers map[pb.ChatRoom_ChatServer]pb.ChatRoom_ChatServer = make(map[pb.ChatRoom_ChatServer]pb.ChatRoom_ChatServer)
)
// ^ 实现login用户注册方法
func (s *service) Login(ctx context.Context, in *pb.User) (*wrappers.StringValue, error) {
in.Id = uuid.New().String()
if _, ok := s.userMap.Load(in.Id); ok {
return nil, status.Errorf(codes.AlreadyExists, "已有同名用户,请换个用户名")
}
s.userMap.Store(in.Id, in)
go s.sendMessage(nil, &pb.ChatMessage{Id: "server", Content: fmt.Sprintf("%v 加入聊天室", in.Name), Time: uint64(time.Now().Unix())})
// some work...
return &wrappers.StringValue{Value: in.Id}, status.New(codes.OK, "").Err()
}
// ^ 实现聊天室
func (s *service) Chat(stream pb.ChatRoom_ChatServer) error {
if s.chatMessageCache == nil {
s.chatMessageCache = make([]*pb.ChatMessage, 0, 1024)
}
workers[stream] = stream
for _, v := range s.chatMessageCache {
stream.Send(v)
}
s.recvMessage(stream)
return status.New(codes.OK, "").Err()
}
func (s *service) recvMessage(stream pb.ChatRoom_ChatServer) {
md, _ := metadata.FromIncomingContext(stream.Context())
for {
mes, err := stream.Recv()
if err != nil {
s.L.Lock()
delete(workers, stream)
s.L.Unlock()
s.userMap.Delete(md.Get("uuid")[0])
fmt.Println("某个用户掉线,目前用户在线数量", len(workers))
break
}
s.chatMessageCache = append(s.chatMessageCache, mes)
v, ok := s.userMap.Load(md.Get("uuid")[0])
if !ok {
fmt.Println("致命错误,用户不存在")
return
}
mes.Name = v.(*pb.User).Name
mes.Time = uint64(time.Now().Unix())
s.sendMessage(stream, mes)
}
}
func (s *service) sendMessage(stream pb.ChatRoom_ChatServer, mes *pb.ChatMessage) {
s.L.Lock()
for _, v := range workers {
if v != stream {
err := v.Send(mes)
if err != nil {
// err handle
continue
}
}
}
s.L.Unlock()
}
main.go
package main
import (
"fmt"
"log"
"net"
"google.golang.org/grpc"
pb "wolflong.com/chatroom_server/chatroom"
)
const (
ip = "127.0.0.1"
port = "23333"
)
func main() {
lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v", ip, port))
if err != nil {
log.Fatalf("无法监听端口 %v %v", port, err)
}
s := grpc.NewServer()
// ^ 注册服务
pb.RegisterChatRoomServer(s, &service{})
log.Println("gRPC服务器开始监听", port)
if err := s.Serve(lis); err != nil {
log.Fatalf("提供服务失败: %v", err)
}
}
Client
package main
import (
"bufio"
"context"
"os"
"strings"
"time"
"github.com/pterm/pterm"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/wrapperspb"
pb "wolflong.com/chatroom_client/chatroom"
)
const (
address = "localhost:23333"
)
func main() {
/* ---------------------------------- 连接服务器 --------------------------------- */
spinner, _ := pterm.DefaultSpinner.Start("正在连接聊天室")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
spinner.Fail("连接失败")
pterm.Fatal.Printfln("无法连接至服务器: %v", err)
return
}
c := pb.NewChatRoomClient(conn)
spinner.Success("连接成功")
/* ---------------------------------- 注册用户名 --------------------------------- */
var val *wrapperspb.StringValue
var user *pb.User
for {
result, _ := pterm.DefaultInteractiveTextInput.Show("创建用户名")
if strings.TrimSpace(result) == "" {
pterm.Error.Printfln("进入聊天室失败,没有取名字")
continue
}
user = &pb.User{Name: result}
val, err = c.Login(context.TODO(), user)
if err != nil {
pterm.Error.Printfln("进入聊天室失败 %v", err)
continue
} else {
break
}
}
user.Id = val.Value
pterm.Success.Println("创建成功!开始聊天吧!")
/* ---------------------------------- 聊天室逻辑 --------------------------------- */
stream, _ := c.Chat(metadata.AppendToOutgoingContext(context.Background(), "uuid", user.Id))
go func(pb.ChatRoom_ChatClient) {
for {
res, _ := stream.Recv()
switch res.Id {
case "server":
pterm.Success.Printfln("(%[2]v) [服务器] %[1]s ", res.Content, time.Unix(int64(res.Time), 0).Format(time.ANSIC))
default:
pterm.Info.Printfln("(%[3]v) %[1]s : %[2]s", res.Name, res.Content, time.Unix(int64(res.Time), 0).Format(time.ANSIC))
}
}
}(stream)
for {
inputReader := bufio.NewReader(os.Stdin)
input, _ := inputReader.ReadString('\n')
input = strings.TrimRight(input, "\r \n")
// pterm.Info.Printfln("%s : %s", user.Name, input)
stream.Send(&pb.ChatMessage{Id: user.Id, Content: input})
}
}
资料
一口气搞懂Go sync-map 所有知识点- 阅坊 (readfog.com)
文章评论