capsule AI-native Unix-like composition layer

src/server/internal/ws/client.go

3,676 bytes · 128 lines · capsule://quake0day/[email protected] raw on github

package ws

import (
	"encoding/json"
	"log"
	"time"

	"github.com/gorilla/websocket"
)

const (
	writeWait             = 10 * time.Second
	pongWait              = 60 * time.Second
	pingPeriod            = (pongWait * 9) / 10
	defaultMaxMessageSize = 1024 * 1024
)

// Client represents a single WebSocket connection.
type Client struct {
	SessionID      string
	Conn           *websocket.Conn
	Send           chan []byte
	MaxMessageSize int64
	hub            *Hub
}

// WSMessage represents a message received from a WebSocket client.
type WSMessage struct {
	Type                   string  `json:"type"`
	Text                   string  `json:"text,omitempty"`
	SDP                    string  `json:"sdp,omitempty"`
	Candidate              string  `json:"candidate,omitempty"`
	SDPMid                 string  `json:"sdp_mid,omitempty"`
	SDPMLine               *uint16 `json:"sdp_mline_index,omitempty"`
	Source                 string  `json:"source,omitempty"`
	Mime                   string  `json:"mime,omitempty"`
	Data                   string  `json:"data,omitempty"`
	Width                  int32   `json:"width,omitempty"`
	Height                 int32   `json:"height,omitempty"`
	TimestampMS            int64   `json:"timestamp_ms,omitempty"`
	FrameSeq               int64   `json:"frame_seq,omitempty"`
	Enabled                bool    `json:"enabled,omitempty"`
	TurnSeq                uint64  `json:"turn_seq,omitempty"`
	SegmentSeq             int64   `json:"segment_seq,omitempty"`
	VideoPresentationLagMS float64 `json:"video_presentation_lag_ms,omitempty"`
	ExcessVideoLagMS       float64 `json:"excess_video_lag_ms,omitempty"`
	JitterBufferDeltaMS    float64 `json:"jitter_buffer_delta_ms,omitempty"`
	Likely                 string  `json:"likely,omitempty"`
}

// ReadPump reads messages from the WebSocket and dispatches them via onMessage.
// Must be run as a goroutine. When it returns, the client is unregistered.
func (c *Client) ReadPump(
	onMessage func(sessionID string, msg WSMessage),
	onActivity func(sessionID string),
) {
	defer func() {
		c.hub.Unregister(c)
		c.Conn.Close()
	}()

	maxMessageSize := c.MaxMessageSize
	if maxMessageSize <= 0 {
		maxMessageSize = defaultMaxMessageSize
	}
	c.Conn.SetReadLimit(maxMessageSize)
	c.Conn.SetReadDeadline(time.Now().Add(pongWait))
	c.Conn.SetPongHandler(func(string) error {
		c.Conn.SetReadDeadline(time.Now().Add(pongWait))
		if onActivity != nil {
			onActivity(c.SessionID)
		}
		return nil
	})

	for {
		_, message, err := c.Conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
				log.Printf("ws: read error for session %s: %v", c.SessionID, err)
			}
			return
		}
		if onActivity != nil {
			onActivity(c.SessionID)
		}

		var msg WSMessage
		if err := json.Unmarshal(message, &msg); err != nil {
			log.Printf("ws: invalid JSON from session %s: %v", c.SessionID, err)
			continue
		}

		onMessage(c.SessionID, msg)
	}
}

// WritePump writes messages from the Send channel to the WebSocket.
// Must be run as a goroutine.
func (c *Client) WritePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.Conn.Close()
	}()

	for {
		select {
		case message, ok := <-c.Send:
			c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
				return
			}

		case <-ticker.C:
			c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}