capsule AI-native Unix-like composition layer

src/server/internal/inference/asr_client.go

1,804 bytes · 95 lines · capsule://quake0day/[email protected] raw on github

package inference

import (
	"context"
	"io"

	pb "github.com/cyberverse/server/internal/pb"
)

// TranscribeStream opens a bidirectional stream: sends audio chunks,
// receives transcript events.
func (c *Client) TranscribeStream(ctx context.Context, audioCh <-chan []byte, config ASRConfig) (<-chan *pb.TranscriptEvent, <-chan error) {
	transcriptCh := make(chan *pb.TranscriptEvent, 8)
	errCh := make(chan error, 1)

	go func() {
		defer close(transcriptCh)
		defer close(errCh)

		stream, err := c.asr.TranscribeStream(ctx)
		if err != nil {
			errCh <- err
			return
		}

		// Sender goroutine
		sendDone := make(chan error, 1)
		go func() {
			defer func() { _ = stream.CloseSend() }()
			if err := stream.Send(&pb.ASRInput{
				Input: &pb.ASRInput_Config{
					Config: &pb.ASRConfig{
						Provider:  config.Provider,
						Language:  config.Language,
						SessionId: config.SessionID,
					},
				},
			}); err != nil {
				sendDone <- err
				return
			}
			for {
				select {
				case <-ctx.Done():
					sendDone <- ctx.Err()
					return
				case data, ok := <-audioCh:
					if !ok {
						sendDone <- nil
						return
					}
					err := stream.Send(&pb.ASRInput{
						Input: &pb.ASRInput_Audio{
							Audio: &pb.AudioChunk{
								Data:       data,
								SampleRate: 16000,
								Channels:   1,
								Format:     "pcm_s16le",
							},
						},
					})
					if err != nil {
						sendDone <- err
						return
					}
				}
			}
		}()

		// Receiver loop
		for {
			event, err := stream.Recv()
			if err == io.EOF {
				break
			}
			if err != nil {
				errCh <- err
				return
			}
			select {
			case transcriptCh <- event:
			case <-ctx.Done():
				errCh <- ctx.Err()
				return
			}
		}

		if err := <-sendDone; err != nil {
			errCh <- err
		}
	}()

	return transcriptCh, errCh
}