源码地址
https://gitee.com/bin-0821/chat-room-demo-go-websocket
关于websocket,上一篇文章讲述了如何通过websocket进行服务端与客户端的通信,本篇将会带领大家把各个websocket进行相互通信,在开始之前,请确保有理解
1 go的通道
2 go的线程
3 gin基础
事实上,websocket与websocket之间是无法进行直接相互通信的,需要我们将数据接收后,发送给另一个websocket链接,可以理解为
conn1.ReadJson(&data)
conn2.WriteJson(data)
而建立一个类似微信聊天一样的,能进行多群聊,一对多,一对一的聊天,需要对websocket进行管理,本篇文章的重点便是如何管理好所有用户的websocket连接,主要有以下方面
1,一个根据业务进行设计的数据结构
2,用户上下线后conn的处理
3,用户发送信息的群发或单发
首先,要搞清楚我们在做什么,聊天室要实现的功能类似微信
a,b,c三人,1,2,3 三个聊天室
人员 加入的聊天室
a 1,2,3
b 1,2
c 1
a在1发送信息,全部人都能收到
a在2发送信息,c收不到,以此类推
a可以与c单独发送信息,接收方不在线时,系统能正常运行
1. 目录结构
相比上篇文章 多了manage_socket_conn,service两个模块,重点在于多了manage_socket_conn模块中
D:.
│ go.mod
│ go.sum
│ main.go
│ msg.json
├─api
│ socket_conn.go
├─manage_socket_conn //用户的websocket管理模块
│ char_room_thread.go //线程 主要负责对信息的群发
│ room_set.go //聊天室房间管理,房间的创建,销毁 存储房间内的用户id
│ user_set.go //用户websocket链接管理,信息的发送,存储所有在线的webscoket链接,用户上下线
├─middleware
│ Cros.go
├─model
│ socket_msg_form_front.go
│ to_front.go
├─route
│ route.go
└─service
chat_room.go //数据层,模拟用户加入了那些聊天室
2. 代码内容
user_set.go
package manage_socket_conn
import (
"WebSocketDemo/model"
"errors"
"fmt"
"github.com/gorilla/websocket"
"sync"
)
func init() {
GetUserSet()
}
//用户map 用来存储每个在线的用户id与对应的conn
type userSet struct {
// 用户链接集 用户id => 链接对象
users map[int]*websocket.Conn
lock sync.Mutex
once sync.Once
}
var us = new(userSet)
// 单例模式
func GetUserSet() *userSet {
us.once.Do(func() {
us.users = make(map[int]*websocket.Conn)
us.users[-1] = nil
us.lock = sync.Mutex{}
})
return us
}
// 用户创建发起websocket连接
// join_type 加入模式
// 1 正常加入 占线无法加入
// 2 强制加入 即踢下线前者
func (u *userSet) ConnConnect(user_id, join_type int, conn *websocket.Conn) (int, error) {
u.lock.Lock()
defer u.lock.Unlock()
if join_type == 1 {
// 用户id是否已经在线
if _, ok := u.users[user_id]; ok {
return 1, errors.New("该账号已被登陆")
}
} else if join_type == 2 {
// 如果原用户id 已经存在map内 进行销毁挤出
if conn2, ok := u.users[user_id]; ok {
err := conn2.Close()
if err != nil {
fmt.Println(err)
}
delete(u.users, user_id)
}
// 重新加入
u.users[user_id] = conn
}
return -1, nil
}
// 链接断开
func (u *userSet) ConnDisconnect(user_id int, conn *websocket.Conn) error {
u.lock.Lock()
defer u.lock.Unlock()
if conn2, ok := u.users[user_id]; ok {
if conn == conn2 {
delete(u.users, user_id)
}
} else {
// Log 不存在的链接申请断开
}
return nil
}
// 对单个链接发送信息
func (u *userSet) SendMsgByUid(user_id int, msg interface{}) error {
var err error
if conn, ok := u.users[user_id]; ok {
err = conn.WriteJSON(msg)
} else {
err = errors.New("不存在的链接")
}
return err
}
// 对多个连接发送信息
func (u *userSet) SendMsgByUidList(user_id_list []int, msg interface{}) (id_list []int, err_list []error) {
for _, user_id := range user_id_list {
// 这里判断用户是否自己,是自己就跳过
c := msg.(model.ChatMsg)
if c.ChatMsgType == 1 {
if (c.Data["form_user_id"].(int)) == user_id {
continue
}
}
if conn, ok := u.users[user_id]; ok {
err := conn.WriteJSON(msg)
if err != nil {
id_list = append(id_list, user_id)
err_list = append(err_list, err)
}
} else {
id_list = append(id_list, user_id)
err_list = append(err_list, errors.New("不存在的链接"))
}
}
return
}
room_set.go
package manage_socket_conn
import (
"sync"
)
//群map 用来存储每个群在线的用户id
type roomSet struct {
// 群id 群内的用户id
rooms map[int]map[int]struct{}
lock sync.Mutex
once sync.Once
}
var rs = new(roomSet)
// 单例
func GetRoomSet() *roomSet{
rs.once.Do(func() {
rs.rooms = make(map[int]map[int]struct{})
rs.lock = sync.Mutex{}
})
return rs
}
// 向用户发送
func (r *roomSet)SendMsgToUserList (r_id int ,msg interface{}){
userS := GetUserSet()
r.lock.Lock()
defer r.lock.Unlock()
var user_id_list []int
for key, _ := range r.rooms[r_id] {
user_id_list = append(user_id_list, key)
}
userS.SendMsgByUidList(user_id_list,msg)
}
// 用户下线/退群 退出聊天室链接集合
func (r *roomSet) UserQuitRooms(room_id_list []int ,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
for _, room_id := range room_id_list {
if v ,ok := r.rooms[room_id];ok {
delete(v,user_id)
// 房间没人就销毁
if len(r.rooms[room_id]) <= 0 {
delete(r.rooms, room_id)
}
}
}
return
}
// 用户上线/入群 加入聊天室连接集合
func (r *roomSet)UserJoinRooms(room_id_list []int,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
for _, room_id := range room_id_list {
if v,ok := r.rooms[room_id];!ok {
// 房间不存在就创建
r.rooms[room_id] = make(map[int]struct{})
r.rooms[room_id][user_id] = struct{}{}
}else {
v[user_id] = struct{}{}
}
}
return
}
// 用户下线/退群 退出聊天室链接集合
func (r *roomSet) UserQuitRoom(room_id ,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
if v ,ok := r.rooms[room_id];ok {
delete(v,user_id)
// 房间没人就销毁
if len(r.rooms[room_id]) <= 0 {
delete(r.rooms, room_id)
}
}
return
}
// 用户上线/入群 加入聊天室连接集合
func (r *roomSet)UserJoinRoom(room_id,user_id int) {
r.lock.Lock()
defer r.lock.Unlock()
if v,ok := r.rooms[room_id];!ok {
// 房间不存在就创建
r.rooms[room_id] = make(map[int]struct{})
r.rooms[room_id][user_id] = struct{}{}
}else {
v[user_id] = struct{}{}
}
return
}
char_room_thread.go
package manage_socket_conn
import (
"WebSocketDemo/model"
"fmt"
"sync"
)
var cRoomThread = new(charRoomThread)
type charRoomThread struct {
msgChannel chan model.ConnMsg
lock sync.Mutex
once sync.Once
}
// 向通道发送数据
func (c *charRoomThread)SendMsg(msg model.ConnMsg){
fmt.Println(msg)
c.msgChannel <- msg
}
// 单例
func GetCharRoomThread() *charRoomThread {
cRoomThread.once.Do(func() {
cRoomThread.msgChannel = make(chan model.ConnMsg,30)
cRoomThread.lock = sync.Mutex{}
})
return cRoomThread
}
// 启动通道监听
// ChatMsgType 1 群聊信息 2 一对一信息
func (c *charRoomThread) Start() {
for {
select {
case msg := <-c.msgChannel:
if msg.Msg.ChatMsgType == 1 {
// 标明发送方用户id
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 在这里你可以将聊天信息入库等等操作
// do something
// 发送信息
// 注意 msg.Msg.Data["room_id"].(int) 这种写法在data为nil时 运行时会 panic 导致整个系统停掉
// 所以在上一层最好对数据内容进行判断,再把值发送到通道内
GetRoomSet().SendMsgToUserList(int(msg.Msg.Data["room_id"].(float64)),msg.Msg)
}else if msg.Msg.ChatMsgType == 2 {
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 如果发送不成功 说明接收方不在线
_ = GetUserSet().SendMsgByUid(int(msg.Msg.Data["to_user_id"].(float64)),msg.Msg)
}
}
}
}
main.go做以下修改
package main
import (
"WebSocketDemo/route"
Mg"WebSocketDemo/manage_socket_conn"
)
func main() {
ro := route.GetRoute()
go Mg.GetCharRoomThread().Start()
_ = ro.Run("0.0.0.0:8083")
}
api层 socket_conn.go 内容
package api
import (
Mg "WebSocketDemo/manage_socket_conn"
"WebSocketDemo/model"
Service "WebSocketDemo/service"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"net/http"
"strconv"
)
// websocket配置
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
}
func checkOrigin(r *http.Request) bool {
return true
}
var (
ServiceChatRoom Service.ChatRoom
)
// 用户申请创建socket链接
func ConCreateConn(ctx *gin.Context) {
var (
conn *websocket.Conn
err error
user_id int
)
// 获取user_id 这里可以是token,经过中间件解析后的存在 ctx 的user_id
// 为方便演示 这里直接请求头带user_id,正常开发不建议
user_id, err = strconv.Atoi(ctx.GetHeader("user_id"))
if err != nil && user_id <= 0 {
ctx.JSON(200, model.ResDatas(500, "请求必须带user_id"+err.Error(), nil))
return
}
//fmt.Println("user_id", user_id)
// 判断请求过来的链接是否要升级为websocket
if websocket.IsWebSocketUpgrade(ctx.Request) {
// 将请求升级为 websocket链接
conn, err = upgrader.Upgrade(ctx.Writer, ctx.Request, ctx.Writer.Header())
if err != nil {
ctx.JSON(200, model.ResDatas(500, "创建链接失败"+err.Error(), nil))
return
}
}else {
return
}
// 获取用户加入的聊天室id数组
room_ids, _ := ServiceChatRoom.GetUserRoomIds(user_id)
// 用户加入房间集
Mg.GetRoomSet().UserJoinRooms(room_ids,user_id)
// 用户加入链接集
_,_ = Mg.GetUserSet().ConnConnect(user_id,2,conn)
// 用户断开销毁
defer func() {
_ = conn.Close()
// 用户断开时也要销毁在聊天集内的对象
_ = Mg.GetUserSet().ConnDisconnect(user_id,conn)
}()
for {
var msg model.ConnMsg
// ReadJSON 获取值的方式类似于gin的 ctx.ShouldBind() 通过结构体的json映射值
// 如果读不到值 则堵塞在此处
err = conn.ReadJSON(&msg)
if err != nil {
// 写回错误信息
err = conn.WriteJSON(model.ResDatas(400, "获取数据错误:"+err.Error(), nil))
if err != nil {
fmt.Println("用户断开")
return
}
}
// do something.....
msg.FormUserID = user_id
// 发送回信息
//err = conn.WriteJSON(msg)
if err != nil {
fmt.Println("用户断开")
return
}
if err = valMsg(msg);err != nil{
_ = conn.WriteJSON(model.ResDatas(400, "数据不合法:"+err.Error(), nil))
continue
}
// 将数据发送进通道
Mg.GetCharRoomThread().SendMsg(msg)
}
}
// 验证数据 例如用户是否有加入聊天室
func valMsg(msg model.ConnMsg) error {
// do something...
return nil
}
关于代码的解释在注释里已经写的非常清楚了,
主要是两个结构体
//群map 用来存储每个群在线的用户id
type roomSet struct {
// 群id 群内的用户id
rooms map[int]map[int]struct{}
lock sync.Mutex
once sync.Once
}
//用户map 用来存储每个在线的用户id与对应的conn
type userSet struct {
// 用户链接集 用户id => 链接对象
users map[int]*websocket.Conn
lock sync.Mutex
once sync.Once
}
可能有人会问,roomSet.rooms[房间id]map[用户id]struct{}
这里的用户集为什么是map类型,而不是[]int类型
答:
想一下,当用户下线或退出群聊时,怎么在[]int内进行删除该用户的id,注意:此时的[]int是无序的,而加入群时,又要防止id重复,所以实现起来过于麻烦,倒不如使用map,go底层为你封装好的值判断,使用起来会更方便,这里的struct{}是没有意义的,仅作为占位。
当我要把信息发送给群里的所有用户时,先从roomSet根据房间id拿到用户id map,将key转化为[]int,调用userSet的 SendMsgByUidList()方法这样就完成了信息的群发
而一对一的单发就不再重复说了,跳过roomSet,直接发送
其他内容已经在代码注释里讲得非常详细了
3. 具体流程
// 1 main方法内 启动一条线程 监听从api层的socket_conn ConCreateConn()用户写入的发送的值
go Mg.GetCharRoomThread().Start()
// 2 ConCreateConn()方法内
// 获取用户加入的聊天室id数组
room_ids, _ := ServiceChatRoom.GetUserRoomIds(user_id)
// 用户加入房间集
Mg.GetRoomSet().UserJoinRooms(room_ids,user_id)
// 用户加入链接集
_,_ = Mg.GetUserSet().ConnConnect(user_id,2,conn)
defer func() {
_ = conn.Close()
// 用户断开时也要销毁在聊天集内的对象
_ = Mg.GetUserSet().ConnDisconnect(user_id,conn)
}()
//3 用户发送数据
Mg.GetCharRoomThread().SendMsg(msg)
//启动的线程将会接收到通道的数据 char_room_thread
Mg.GetCharRoomThread().Start()
// 判断数据发送类型进行发送
// ChatMsgType 1 群聊信息 2 一对一信息
if msg.Msg.ChatMsgType == 1 {
// 标明发送方用户id
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 在这里你可以将聊天信息入库等等操作
// do something
// 发送信息
// 注意 msg.Msg.Data["room_id"].(int) 这种写法在data为nil时 运行时会 panic 导致整个系统停掉
// 所以在上一层最好对数据内容进行判断,再把值发送到通道内
GetRoomSet().SendMsgToUserList(int(msg.Msg.Data["room_id"].(float64)),msg.Msg)
}else if msg.Msg.ChatMsgType == 2 {
msg.Msg.Data["form_user_id"] = msg.FormUserID
// 如果发送不成功 说明接收方不在线
_ = GetUserSet().SendMsgByUid(int(msg.Msg.Data["to_user_id"].(float64)),msg.Msg)
}
4. 启动项目并测试
使用apipost进行websocket测试
用户加入的群在servic层已经模拟了
func (ChatRoom) GetUserRoomIds(user_id int) (r_ids []int,err error) {
if user_id == 1 {
r_ids = []int{1,2,3}
}else if user_id == 2 {
r_ids = []int{1,2}
}else if user_id == 3 {
r_ids = []int{1}
}
return
}
user_id与接口名称一样,然后将所有websocket进行连接
发送json数据
{
"msg": {
"chat_msg_type": 1,
"data": {
"room_id": 2,
"content": "我是用户1发送的信息,房间2,只有用户3,不在房间"
}
}
}
房间2 只有用户3不存在房间里所以用户3接收不到信息
用户2 发送信息
{
"msg": {
"chat_msg_type": 1,
"data": {
"room_id": 1,
"content": "我是用户2发送的信息,房间1,所有人都可以接收"
}
}
}
可以看到所有人都可以接收信息
一对一测试
用户1发送json给用户3
{
"msg": {
"chat_msg_type": 2,
"data": {
"to_user_id": 3,
"content": "不要让用户2看到"
}
}
}
可以看到只有用户2无法收到一对一的信息
5. 总结
关于websocket系列教程就已经结束了
本章重点在于如何进行设计一个websocket管理模块,对在线的用户进行管理
不足点:
由于怕篇幅过长,没有将聊天数据存储起来,实现原理便是在发送信息前把数据存入库,搭配gorm的事务,当有错误时便回滚,
用户上线时,前端获取本地存储的聊天数据id,拉取最后的数据列表,便可做到用户上线读取未接收的数据,这里可以在api层直接实现
程序的所有问题,大部分都可以通过创造性的思想进行解决,希望本篇内容能对你有所帮助
欢迎大家点赞转发
文章评论