基于Websocket的消息传输系统

更新于 2020-12-31

鄙人就是普通本科生,编码水平谦虚,大多数代码源自其他项目,比如一个叫leaf的游戏框架。

leafgithub.com

但是作为普通的穷苦人民,在上面催你的时候,很难静下新来,慢慢的打开vscode去一行一行的看,比如你现在可能就是在火急火燎的想在知乎上寻求一个解决方案。

先说一下我的使用场景。我的场景是这样:

用户会登陆软件后,需要一些认证方法,连接上socket,然后会进行高速传输,比如消息、群消息、交易拍卖、其他等等一系列消息,主体就是用户,客体是其他用户或者服务器。

架构

我现对整个项目架构描述一下

img

先简述一下Hub的设计,总的一个Hub包含Client这样一个map用来储存连接信息,然后每个连接有一个Reader和一个Write来从客户端读取和写入客户端。

按照图中标志,按实际执行顺序来讲

①. 客户端向服务端发起Socket连接,(服务器识别到这是Socket连接之后有个Upgrade操作),这里不太明白的可以去看看Socket连接的过程,并不复杂。

③. Reader收到的第一条消息,是注册消息,一般又客户端OnConnect的时候发送,一般会携带认证信息进行身份认证。

②. 认证成功之后服务器应该返回用户类似于JWT或者SessionCookie类的东西,或者没有也没关系,因为每次重新创建连接都要认证,我用JWT是为了做访问控制。

如果没有其他异常:就会一直处在①、②、①、②……这样的收发状态。直到…….

④. 注销,一旦客户端退出或者网络中断等异常,就会出现断开。

好了讲了整个东西架构,让我们来看看怎么实现的把,showcode时间到了。

实现

首先,让我们来看看hub的结构吧,说结构还真的就是结构

1
2
3
4
5
6
7
8
// 整个项目的交流系统,由clients和各种不同的chan组成,
type Hub struct {
 Clients      map[*Client]bool   // 储存保持连接的用户
 UserToClient map[string]*Client // 获得用户和其连接的对应关系
 Message      chan []byte        // 广播消息队列
 Register     chan *Client       // 发起注册请求的队列
 Unregister   chan *Client       // 断开连接的队列
}

注释讲的比较清楚了,我中间使用UserToClient这个map来方便找到我们的Client而已,

注意,Client我们存的是指针,因此在这里并没有太多冗余,我只是为了方便找用户,而且不用再遍历。

这里大家有没有注意到一个好玩的东西叫做chan ,这个东西在C/C++里叫做pipe,但是我个人觉得在go里爽的不止那么一点点,有这个东西,我的思路清晰多了。

Golang 给我们提供了一个类似于管道的东西,目的是进行协程间通信。我们先假装自己不知道goroutine这个东西,我们就可以认为 go produce(ch) 就相当于手动执行了一个程序,传入了我们事先 make 的一个长度(缓冲长度)为 5 的chan ch := make(chan int, 5) 这个管道有点像。

那么我们在本次项目当中也会使用这种 chan ,功能就是这样:

再补充一个关键词 阻塞(block) 就是当程序执行到 v := <- chan 然后这个 chan 里没东西的时候,程序就会阻塞。那么好我们来看一下怎么用上面的那个Hub的结构。

忘了说,这个Hub是单例,也就是说整个代码里只运行一份。因此一个Hub要写完整个项目的逻辑,那么我就从hub的创建开始

1
2
3
4
5
6
7
8
9
func NewHub() *Hub {
 return &Hub{
  Message:      make(chan []byte),        // 消息队列
  Register:     make(chan *Client),       // 等待创建连接的用户
  Unregister:   make(chan *Client),       // 等待退出断开连接的用户
  Clients:      make(map[*Client]bool),   // 客户数组
  UserToClient: make(map[string]*Client), //建立用户名和客户端唯的唯一通道
 }
}

main函数中运行hub并且run起来

1
2
3
4
// 创建端系统的hub
 hub := starlight.NewHub()
 // 运行hub
 go hub.Run()

我的天,我给项目起了个名字,叫星光,太羞耻了。

当同时操控好几 chan 的时候golang官方推荐一种处理方式使用语句 select,以下案例来自官方。

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the “select” statement blocks until at least one of the communications can proceed.

1
2
3
4
5
6
7
8
//select基本用法
select {
case <- chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程

这样就如何使用hub就很明朗了,你只需要将自己游戏或者软件里的逻辑每个业务需要使用 chan 的让他循环跑起来,像这样

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Run 跑起来,完成和用户保持连接,注册,注销用户等功能,控制消息队列
func (h *Hub) Run() {
 for {
  select {

  case client := <-h.Register: // 用户创建新的连接
   {
    h.Clients[client] = true
   }

  case client := <-h.Unregister: // 用户断开连接
   {
    if _, ok := h.Clients[client]; ok {
     delete(h.Clients, client)
     close(client.Send)
    }
    delete(h.UserToClient, client.Info.UserID)
   }
  // 这个是全局消息 每个人都会收到
  case message := <-h.Message:
   {
    for client := range h.Clients {
     select {
     case client.Send <- message:
     default:
      close(client.Send)
      delete(h.Clients, client)
     }
    }
   }
  }
 }
}

第一个 case client := <- h.Register也就是从注册队列中取一个出来执行注册逻辑,下面是注销逻辑,记得用了哪些东西必须删除,然后下面那个Message是大喇叭,所有客户端都会收到消息,写好这个东西,你的加特林就已经转起来了,虽然还没有一发子弹。

那我们来看一下简单逻辑。

1
2
3
4
// 监听消息
 http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  wshandler.ServeWs(hub, w, r)
 })

这里我使用的是官方http库,如果要使用beego或者iris等框架,应该有对应的 handler。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// serveWs handles websocket requests from the peer.
func ServeWs(hub *starlight.Hub, w http.ResponseWriter, r *http.Request) {
 //完成之后将http请求升级成socket连接
 conn, err := upgrader.Upgrade(w, r, nil)
 if err != nil {
  log.Println(err)
  return
 }
 // 初始化该用户
 c := &starlight.Client{Hub: hub, Conn: conn, Send: make(chan []byte, 256)}
 // 连接注册
 hub.Register <- c

 go c.ReadPump()
 go c.WritePump()
}

我用升级这个词可能不太合适,但是官方库的函数名称就叫做 Upgrade,然后中间用到了一个 upgrader 这个是官方写法,应该算是一个中间件,写法如下,大概意思就是 HTTP 遇到 101 code,然后 Golang hijike 将 HTTP 内的 TCP 链接取出来,协议升级到 WebSocket。

1
2
3
4
5
var upgrader = websocket.Upgrader{
 ReadBufferSize:  1024,
 WriteBufferSize: 1024,
 CheckOrigin:     func(r *http.Request) bool { return true },
}

现在幻想一下,http服务也跑起来了,hub也跑起来了,每一个用户只要请求,服务器的/ws,他发起的Socket连接就会被 upgrade 成一个WebSocket连接,并且被Client持有,他的指针会被推入注册的的队列中,你再会看一下之前的注册逻辑,其实就是把 Client 这个 map 的对应项设置为 true。到现在为止客户端已经可以和用户保持连接了。但是还不能收发数据。根据更前方的框架图主要在go c.ReadPump()go c.WritePump() 这两个函数上,他们在处理,和用户交互的逻辑。 我们一样的进来看看这两个函数怎么写的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (c *Client) ReadPump() {

 defer func() {
  c.Hub.Unregister <- c
  _ = c.Conn.Close()
 }()

 c.Conn.SetReadLimit(maxMessageSize)
 _ = c.Conn.SetReadDeadline(time.Now().Add(pongWait))
 c.Conn.SetPongHandler(func(string) error {
  _ = c.Conn.SetReadDeadline(time.Now().Add(pongWait))
  return nil
 })

 // 第一个消息就是处理注册
 if err := c.OnLogin(); err != nil {
  log.Println(err.Error())
  return
 }
 for { ........

无非就是一些配置,然后再循环外面执行OnConnect的操作,我们来看看 Login 咋写的我都快忘了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (c *Client) OnLogin() error {

 // 接收用户的消息头
 _, message, err := c.Conn.ReadMessage()
 if err != nil {
  return err
 }
        // 回复注册
 c.Send <- message
}

我这里没有做什么特殊处理,也就是将用户发过来的消息原封不动发回去就算是成功注册了,但是在业务逻辑当中你可能需要比如,认证用户,并且加入某种权限组之类的操作,一类操作。在这里我就不再详细说。

你们有没有注意到ReadPump()函数开始写了一段defer

1
2
3
4
defer func() {
  c.Hub.Unregister <- c
  _ = c.Conn.Close()
 }()

这个东西作用就是一旦用户出现任何异常,立马return立马关闭他的连接,防止出现任何异常,你可以写上其他操作,不过一般 注销逻辑都放在hub下面的for select那写了。

ReadPump后面for循环内的逻辑就比较简单了,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
for {

  // 读取消息
  _, message, err := c.Conn.ReadMessage()

  if err != nil {
   if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
    log.Printf("error: %v", err)
   }
   break
  }
  mess := Message{}

  err = json.Unmarshal(message, &mess)
  if err != nil {
   fmt.Println(err)
   continue
  }
........

我还是不全部粘贴了,我的垃圾代码实在是太长了,主要我的业务逻辑比较复杂。

你会发现这样一个问题,你从c是可以取到这个 client 的标识的,并且可与取到 hub,那么爽的来了,当某 c 说要发消息给另一个 c 的时候,我们就直接粗暴解决问题

1
c.Hub.UserToClient[mess.ToUserID].Send <- res

简直爽到不能在爽,你的加特林一直在疯狂转(其实是阻塞的),你只要把消息用 <-这个操作符推进去,消息就会自动通过WritePump发出去。

那么这个项目算不算完成了,暂时没有,WritePump也看一眼,这个东西是一劳永逸的函数写完一次就不用动了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (c *Client) WritePump() {
 ticker := time.NewTicker(pingPeriod)
 defer func() {
  ticker.Stop()
  _ = c.Conn.Close()
 }()

 // 发送消息的队列
 for {
  select {
  case message, ok := <-c.Send:
   _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
   if !ok {
    // The hub closed the channel.
    _ = c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
    return
   }
   // TODO:这里是一个发送内容需要指定的Writer
   w, err := c.Conn.NextWriter(websocket.TextMessage)
   if err != nil {
    return
   }
   
   _, _ = w.Write(message)

   n := len(c.Send)
   for i := 0; i < n; i++ {

    _, _ = w.Write(<-c.Send)
   }

   // 差错
   if err := w.Close(); err != nil {
    return
   }
  case <-ticker.C:
   _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
   if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
    return
   }
  }
 }
}

再整理一遍,

1、 初始化一个Hub,并且运行起来

2、接收用户请求,upgrade成socket连接并且交给hub持有。

3、运行Readpump()WritePump()来从客户端读取消息,并且写入消息。

4、每个队列各司其职,完美解决消息传输业务。

就酱,最近人在山东,太冷了,冷影响写代码了。

加油,各位代码狗子们,我已经写了一年半的代码了,进展很快。现在上手什么语言过渡都很快,很适合写垃圾业务逻辑代码,啥时候能给我个清净的研究的时间啊,哎。

Last updated on Sep 05, 2023 08:48 UTC