src/server/internal/ws/client.go
3,676 bytes · 128 lines · capsule://quake0day/[email protected]
raw on github
package ws
import (
"encoding/json"
"log"
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
defaultMaxMessageSize = 1024 * 1024
)
// Client represents a single WebSocket connection.
type Client struct {
SessionID string
Conn *websocket.Conn
Send chan []byte
MaxMessageSize int64
hub *Hub
}
// WSMessage represents a message received from a WebSocket client.
type WSMessage struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
SDP string `json:"sdp,omitempty"`
Candidate string `json:"candidate,omitempty"`
SDPMid string `json:"sdp_mid,omitempty"`
SDPMLine *uint16 `json:"sdp_mline_index,omitempty"`
Source string `json:"source,omitempty"`
Mime string `json:"mime,omitempty"`
Data string `json:"data,omitempty"`
Width int32 `json:"width,omitempty"`
Height int32 `json:"height,omitempty"`
TimestampMS int64 `json:"timestamp_ms,omitempty"`
FrameSeq int64 `json:"frame_seq,omitempty"`
Enabled bool `json:"enabled,omitempty"`
TurnSeq uint64 `json:"turn_seq,omitempty"`
SegmentSeq int64 `json:"segment_seq,omitempty"`
VideoPresentationLagMS float64 `json:"video_presentation_lag_ms,omitempty"`
ExcessVideoLagMS float64 `json:"excess_video_lag_ms,omitempty"`
JitterBufferDeltaMS float64 `json:"jitter_buffer_delta_ms,omitempty"`
Likely string `json:"likely,omitempty"`
}
// ReadPump reads messages from the WebSocket and dispatches them via onMessage.
// Must be run as a goroutine. When it returns, the client is unregistered.
func (c *Client) ReadPump(
onMessage func(sessionID string, msg WSMessage),
onActivity func(sessionID string),
) {
defer func() {
c.hub.Unregister(c)
c.Conn.Close()
}()
maxMessageSize := c.MaxMessageSize
if maxMessageSize <= 0 {
maxMessageSize = defaultMaxMessageSize
}
c.Conn.SetReadLimit(maxMessageSize)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
if onActivity != nil {
onActivity(c.SessionID)
}
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("ws: read error for session %s: %v", c.SessionID, err)
}
return
}
if onActivity != nil {
onActivity(c.SessionID)
}
var msg WSMessage
if err := json.Unmarshal(message, &msg); err != nil {
log.Printf("ws: invalid JSON from session %s: %v", c.SessionID, err)
continue
}
onMessage(c.SessionID, msg)
}
}
// WritePump writes messages from the Send channel to the WebSocket.
// Must be run as a goroutine.
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}