forcepush this away if i fail
This commit is contained in:
parent
02044c60b4
commit
d1f1aae0d3
4 changed files with 277 additions and 156 deletions
|
|
@ -1,179 +1,252 @@
|
|||
package rtsp
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jonas-koeritz/actioncam/libipcamera"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/base"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
||||
|
||||
// We keep this import so the CreateServer() signature matches the original
|
||||
// repo (actioncam.go calls rtsp.CreateServer(ctx, host, port, camera)).
|
||||
"github.com/jonas-koeritz/actioncam/libipcamera"
|
||||
)
|
||||
|
||||
// Server implements the RTSP protocol to serve a H.264 stream
|
||||
// This is where your RTP relay should send H.264 packets.
|
||||
// The original project uses port 5220 for the preview stream.
|
||||
const defaultRTPInPort = 5220
|
||||
|
||||
// Server wraps the gortsplib server + the in-memory stream.
|
||||
type Server struct {
|
||||
localIP string
|
||||
localPort int
|
||||
listener net.Listener
|
||||
remoteRTPPort int
|
||||
remoteIP string
|
||||
rtpRelay *libipcamera.RTPRelay
|
||||
camera *libipcamera.Camera
|
||||
sdp string
|
||||
context context.Context
|
||||
gs *gortsplib.Server
|
||||
stream *gortsplib.ServerStream
|
||||
media *description.Media
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
rtpConn *net.UDPConn
|
||||
}
|
||||
|
||||
// CreateServer creates a new Server instance
|
||||
func CreateServer(ctx context.Context, localIP string, port int, camera *libipcamera.Camera) *Server {
|
||||
server := &Server{
|
||||
localIP: localIP,
|
||||
localPort: port,
|
||||
camera: camera,
|
||||
remoteRTPPort: 0,
|
||||
remoteIP: "",
|
||||
sdp: fmt.Sprintf("v=0\r\no=- 0 0 IN IP4 %s\r\ns=ActionCamera\r\nt=0 0\r\na=control:*\r\nm=video 0 RTP/AVP 99\r\nc=IN IP4 0.0.0.0\r\na=rtpmap:99 H264/90000\r\na=control:trackID=0", localIP),
|
||||
context: ctx,
|
||||
}
|
||||
return server
|
||||
// handler implements the gortsplib ServerHandler* interfaces.
|
||||
type handler struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// ListenAndServe starts listening for connections and handles them
|
||||
func (s *Server) ListenAndServe() error {
|
||||
log.Printf("%+v\n", *s)
|
||||
listener, err := net.Listen("tcp4", fmt.Sprintf("%s:%d", s.localIP, s.localPort))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.listener = listener
|
||||
// --- RTSP callbacks (multi-client capable) ---
|
||||
|
||||
log.Printf("RTSP Server waiting for connections on %s:%d\n", s.localIP, s.localPort)
|
||||
// OnDescribe: clients ask what the stream looks like (SDP).
|
||||
func (h *handler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
|
||||
log.Printf("RTSP DESCRIBE from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.context.Done():
|
||||
listener.Close()
|
||||
break
|
||||
default:
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
log.Printf("ERROR accepting connection: %s\n", err)
|
||||
}
|
||||
|
||||
log.Printf("Accepted new RTSP Client %s\n", conn.RemoteAddr().String())
|
||||
|
||||
go s.handleClient(conn)
|
||||
}
|
||||
}
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
}, h.srv.stream, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleClient(conn net.Conn) error {
|
||||
packet := make([]string, 0)
|
||||
scanner := bufio.NewScanner(conn)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if len(line) > 0 {
|
||||
packet = append(packet, line)
|
||||
} else {
|
||||
s.handleRequest(packet, conn)
|
||||
packet = make([]string, 0)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
// OnSetup: client wants to SUBSCRIBE to the existing stream.
|
||||
func (h *handler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
|
||||
log.Printf("RTSP SETUP from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path)
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
}, h.srv.stream, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleRequest(packet []string, conn net.Conn) {
|
||||
fmt.Printf("C->S:\n%s\n", packet)
|
||||
// OnPlay: client starts receiving packets.
|
||||
func (h *handler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
|
||||
log.Printf("RTSP PLAY from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path)
|
||||
|
||||
request := strings.Split(packet[0], " ")
|
||||
if len(request) != 3 {
|
||||
log.Printf("Received invalid request")
|
||||
return
|
||||
}
|
||||
|
||||
method := request[0]
|
||||
headers := make(map[string]string, 0)
|
||||
for _, header := range packet[1:] {
|
||||
parts := strings.Split(header, ":")
|
||||
if len(parts) >= 2 {
|
||||
headers[parts[0]] = strings.TrimSpace(strings.Join(parts[1:], ":"))
|
||||
}
|
||||
}
|
||||
|
||||
session := fmt.Sprintf("%X", md5.Sum([]byte(conn.RemoteAddr().String())))
|
||||
|
||||
switch method {
|
||||
case "OPTIONS":
|
||||
writeStatus(conn, 200, "OK")
|
||||
replyCSeq(conn, headers)
|
||||
conn.Write([]byte("Public: DESCRIBE, SETUP, PLAY, PAUSE, RECORD\r\n\r\n"))
|
||||
case "DESCRIBE":
|
||||
writeStatus(conn, 200, "OK")
|
||||
replyCSeq(conn, headers)
|
||||
writeHeader(conn, "Content-Type", "application/sdp")
|
||||
writeHeader(conn, "Content-Length", fmt.Sprintf("%d", len(s.sdp)))
|
||||
conn.Write([]byte(fmt.Sprintf("\r\n%s", s.sdp)))
|
||||
case "SETUP":
|
||||
transportDescription := strings.Split(headers["Transport"], ";")
|
||||
rtpDescription := transportDescription[len(transportDescription)-1]
|
||||
remoteRTPPort, err := strconv.ParseInt(strings.Split(strings.Split(rtpDescription, "=")[1], "-")[0], 10, 32)
|
||||
if err != nil {
|
||||
log.Printf("ERROR Parsing RTP description: %s\n", err)
|
||||
return
|
||||
}
|
||||
s.remoteRTPPort = int(remoteRTPPort)
|
||||
s.remoteIP = (conn.RemoteAddr().(*net.TCPAddr)).IP.String()
|
||||
|
||||
log.Printf("Preparing to Stream to %s:%d\n", s.remoteIP, s.remoteRTPPort)
|
||||
|
||||
writeStatus(conn, 200, "OK")
|
||||
replyCSeq(conn, headers)
|
||||
writeHeader(conn, "Transport", headers["Transport"]+";ssrc=0")
|
||||
writeHeader(conn, "Session", session)
|
||||
conn.Write([]byte("\r\n"))
|
||||
|
||||
case "PLAY":
|
||||
s.rtpRelay = libipcamera.CreateRTPRelay(s.context, net.ParseIP(s.remoteIP), s.remoteRTPPort)
|
||||
s.camera.StartPreviewStream()
|
||||
|
||||
writeStatus(conn, 200, "OK")
|
||||
replyCSeq(conn, headers)
|
||||
writeHeader(conn, "Session", session)
|
||||
writeHeader(conn, "RTP-Info", "url="+request[1]+";seq=10;rtptime=10")
|
||||
conn.Write([]byte("\r\n"))
|
||||
case "TEARDOWN":
|
||||
s.rtpRelay.Stop()
|
||||
writeStatus(conn, 200, "OK")
|
||||
replyCSeq(conn, headers)
|
||||
conn.Write([]byte("\r\n"))
|
||||
case "RECORD":
|
||||
s.camera.StartRecording()
|
||||
|
||||
writeStatus(conn, 200, "OK")
|
||||
replyCSeq(conn, headers)
|
||||
writeHeader(conn, "Session", session)
|
||||
conn.Write([]byte("\r\n"))
|
||||
|
||||
default:
|
||||
return
|
||||
}
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func writeStatus(conn net.Conn, status int, statusWord string) {
|
||||
conn.Write([]byte(fmt.Sprintf("RTSP/1.0 %d %s\r\n", status, statusWord)))
|
||||
// (Optional) For logging / debugging:
|
||||
func (h *handler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
|
||||
log.Printf("RTSP connection opened: %v", ctx.Conn.NetConn().RemoteAddr())
|
||||
}
|
||||
|
||||
func replyCSeq(conn net.Conn, headers map[string]string) {
|
||||
writeHeader(conn, "CSeq", headers["CSeq"])
|
||||
func (h *handler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
|
||||
log.Printf("RTSP connection closed: %v (err=%v)", ctx.Conn.NetConn().RemoteAddr(), ctx.Error)
|
||||
}
|
||||
|
||||
func writeHeader(conn net.Conn, key, value string) {
|
||||
conn.Write([]byte(fmt.Sprintf("%s: %s\r\n", key, value)))
|
||||
// --- Public API ---
|
||||
|
||||
// CreateServer creates and starts a RTSP server that serves *one* H.264 video
|
||||
// stream, backed by a single UDP source (127.0.0.1:defaultRTPInPort).
|
||||
//
|
||||
// It keeps the original function signature from the repo:
|
||||
// rtsp.CreateServer(applicationContext, host, port, camera)
|
||||
// but the camera is NOT used directly here – you’re expected to start the
|
||||
// RTP relay separately so that it forwards H.264 RTP packets into
|
||||
// 127.0.0.1:defaultRTPInPort.
|
||||
//
|
||||
// Multiple RTSP clients are automatically supported: all of them read from
|
||||
// the same gortsplib.ServerStream.
|
||||
func CreateServer(parentCtx context.Context, host string, port int, _ *libipcamera.Camera) (*Server, error) {
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
|
||||
// Build an in-memory H.264 stream description suitable for gortsplib.
|
||||
stream, media, err := newH264Stream()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("create H264 stream: %w", err)
|
||||
}
|
||||
|
||||
// Listen for incoming RTP packets from the RTP relay.
|
||||
rtpAddr := &net.UDPAddr{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Port: defaultRTPInPort,
|
||||
}
|
||||
|
||||
rtpConn, err := net.ListenUDP("udp", rtpAddr)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("listen udp %v: %w", rtpAddr, err)
|
||||
}
|
||||
|
||||
srv := &Server{
|
||||
stream: stream,
|
||||
media: media,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
rtpConn: rtpConn,
|
||||
}
|
||||
|
||||
h := &handler{srv: srv}
|
||||
|
||||
// Configure gortsplib server.
|
||||
gs := &gortsplib.Server{
|
||||
Handler: h,
|
||||
RTSPAddress: fmt.Sprintf("%s:%d", host, port),
|
||||
// If you also want UDP transport for clients, set UDPRTPAddress/UDPRTCPAddress here.
|
||||
// For most use cases, TCP (interleaved over RTSP) is fine.
|
||||
}
|
||||
srv.gs = gs
|
||||
|
||||
// Start the UDP → RTSP pump.
|
||||
srv.wg.Add(1)
|
||||
go srv.rtpPump()
|
||||
|
||||
// Start RTSP server in the background.
|
||||
go func() {
|
||||
log.Printf("RTSP server listening on rtsp://%s:%d/ (gortsplib)", host, port)
|
||||
|
||||
if err := gs.StartAndWait(); err != nil {
|
||||
log.Printf("RTSP server stopped with error: %v", err)
|
||||
}
|
||||
cancel()
|
||||
}()
|
||||
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
// Stop stops listening for connections
|
||||
func (s *Server) Stop() {
|
||||
s.listener.Close()
|
||||
// Close shuts down the RTSP server and the RTP pump.
|
||||
func (s *Server) Close() {
|
||||
s.cancel()
|
||||
if s.gs != nil {
|
||||
s.gs.Close()
|
||||
}
|
||||
if s.rtpConn != nil {
|
||||
_ = s.rtpConn.Close()
|
||||
}
|
||||
s.wg.Wait()
|
||||
if s.stream != nil {
|
||||
s.stream.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// --- internals ---
|
||||
|
||||
// newH264Stream builds a single-video-track ServerStream with a valid clock rate.
|
||||
//
|
||||
// NOTE:
|
||||
// * SPS / PPS here are "generic valid" values, not tuned to your camera.
|
||||
// * For best results, you can parse the real SPS/PPS from your camera.sdp and
|
||||
// drop them in here.
|
||||
func newH264Stream() (*gortsplib.ServerStream, *description.Media, error) {
|
||||
// Generic baseline-profile SPS/PPS. They just need to be *valid* so that
|
||||
// the library knows the clock rate and doesn't panic ("non-positive interval
|
||||
// for NewTicker") :contentReference[oaicite:1]{index=1}
|
||||
h264 := &format.H264{
|
||||
PayloadTyp: 96,
|
||||
SPS: []byte{
|
||||
0x67, 0x42, 0xC0, 0x1F, 0x96, 0x54, 0x05, 0x01, 0xED, 0x00, 0xF0, 0x88, 0x45, 0x80,
|
||||
},
|
||||
PPS: []byte{
|
||||
0x68, 0xCE, 0x38, 0x80,
|
||||
},
|
||||
PacketizationMode: 1,
|
||||
}
|
||||
|
||||
media := &description.Media{
|
||||
Type: description.MediaTypeVideo,
|
||||
// You could also set media.Control if you want a specific track URL.
|
||||
Formats: []format.Format{h264},
|
||||
}
|
||||
|
||||
desc := &description.Session{
|
||||
Medias: []*description.Media{media},
|
||||
}
|
||||
|
||||
stream := &gortsplib.ServerStream{
|
||||
Desc: desc,
|
||||
}
|
||||
if err := stream.Initialize(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return stream, media, nil
|
||||
}
|
||||
|
||||
// rtpPump reads RTP packets from UDP and pushes them into the gortsplib stream.
|
||||
// Every connected RTSP client gets the same packets (multi-client fan-out).
|
||||
func (s *Server) rtpPump() {
|
||||
defer s.wg.Done()
|
||||
|
||||
buf := make([]byte, 2048)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Avoid blocking forever so we can react to shutdown.
|
||||
_ = s.rtpConn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
|
||||
n, _, err := s.rtpConn.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
||||
continue
|
||||
}
|
||||
// If the context is done, exit quietly.
|
||||
if s.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("RTP read error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
var pkt rtp.Packet
|
||||
if err := pkt.Unmarshal(buf[:n]); err != nil {
|
||||
// Ignore malformed packets.
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.stream.WritePacketRTP(s.media, &pkt); err != nil {
|
||||
// This is non-fatal; clients can disconnect at any time.
|
||||
log.Printf("WritePacketRTP error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue