diff --git a/actioncam.go b/actioncam.go index fb3617e..88224f4 100644 --- a/actioncam.go +++ b/actioncam.go @@ -55,7 +55,7 @@ func main() { Args: cobra.MaximumNArgs(1), Run: func(cmd *cobra.Command, args []string) { defer camera.Disconnect() - relay := libipcamera.CreateRTPRelay(applicationContext, net.ParseIP("127.0.0.1"), 5220) + relay := libipcamera.CreateRTPRelay(applicationContext) defer relay.Stop() camera.StartPreviewStream() @@ -271,10 +271,15 @@ func main() { if(host == "") { host = "127.0.0.1" } - - rtspServer, err := rtsp.CreateServer(applicationContext, host, port, camera) + relay := libipcamera.CreateRTPRelay(applicationContext) + rtspServer, err := rtsp.CreateServer(applicationContext, host, port, relay) defer rtspServer.Close() + + if err := camera.StartPreviewStream(); err != nil { + log.Printf("ERROR preview: %v", err) + return + } log.Printf("Created RTSP Server\n") if err != nil { diff --git a/libipcamera/RTPRelay.go b/libipcamera/RTPRelay.go index c009791..bf7e3a8 100644 --- a/libipcamera/RTPRelay.go +++ b/libipcamera/RTPRelay.go @@ -11,36 +11,36 @@ import ( "time" ) -// RTPRelay holds information on the relaying stream listener -type RTPRelay struct { - close bool - targetIP net.IP - targetPort int - listener net.PacketConn - context context.Context +// Frame is a decoded RTP packet buffer forwarded to the RTSP server +type Frame struct { + Payload []byte } -var close bool +// RTPRelay holds information on the relaying stream listener +type RTPRelay struct { + close bool + listener net.PacketConn + context context.Context + Output chan Frame // 🚨 NEW: channel to push decoded RTP frames +} + +var closeFlag bool // CreateRTPRelay creates a UDP listener that handles live data -// from the camera and forwards it as an RTP stream -func CreateRTPRelay(ctx context.Context, targetAddress net.IP, targetPort int) *RTPRelay { +// and pushes RTP-ready payloads into Output channel. +func CreateRTPRelay(ctx context.Context) *RTPRelay { conn, err := net.ListenPacket("udp", ":6669") if err != nil { log.Printf("ERROR: %s\n", err) } - - close = false + + closeFlag = false relay := RTPRelay{ - close: false, - targetIP: targetAddress, - targetPort: targetPort, - listener: conn, - context: ctx, - } - if err != nil { - log.Printf("ERROR: %s\n", err) + close: false, + listener: conn, + context: ctx, + Output: make(chan Frame, 100), // buffered for smoother streaming } go handleCameraStream(relay, conn) @@ -55,92 +55,85 @@ func handleCameraStream(relay RTPRelay, conn net.PacketConn) { header := streamHeader{} var payload []byte - rtpTarget := net.UDPAddr{ - IP: relay.targetIP, - Port: relay.targetPort, - } - rtpSource, _ := net.ResolveUDPAddr("udp", "127.0.0.1") - rtpConn, err := net.DialUDP("udp", rtpSource, &rtpTarget) - if err != nil { - log.Printf("ERROR creating RTP sender: %s\n", err) - } - var sequenceNumber uint16 var elapsed uint32 frameBuffer := bytes.Buffer{} packetBuffer := bytes.Buffer{} - T: - for { - conn.SetReadDeadline(time.Now().Add(10 * time.Second)) - - select { - case <-relay.context.Done(): - log.Println("Context Done") - rtpConn.Close() + +T: + for { + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + + select { + case <-relay.context.Done(): + log.Println("Context Done") + relay.listener.Close() + close(relay.Output) + break T + default: + if closeFlag { relay.listener.Close() + close(relay.Output) break T - default: - if close { - rtpConn.Close() - relay.listener.Close() - break T - } - - conn.ReadFrom(buffer) - packetReader.Reset(buffer) + } - binary.Read(packetReader, binary.BigEndian, &header) + conn.ReadFrom(buffer) + packetReader.Reset(buffer) - if header.Magic != 0xBCDE { - log.Printf("Received message with invalid magic (%x).", header.Magic) + binary.Read(packetReader, binary.BigEndian, &header) + + if header.Magic != 0xBCDE { + log.Printf("Received message with invalid magic (%x).", header.Magic) + break + } + + if header.Length > 0 { + payload = make([]byte, header.Length) + _, err := io.ReadFull(packetReader, payload) + if err != nil { + log.Printf("Read Error: %s\n", err) break } + } else { + payload = []byte{} + } - if header.Length > 0 { - payload = make([]byte, header.Length) - _, err := io.ReadFull(packetReader, payload) - if err != nil { - log.Printf("Read Error: %s\n", err) - break - } - } else { - payload = []byte{} - } + switch header.MessageType { + case 0x0001: // H.264 Data + frameBuffer.Write(payload) - switch header.MessageType { - case 0x0001: // H.264 Data - frameBuffer.Write(payload) - case 0x0002: // Time - // Append the Framebuffer - packetBuffer.Write(frameBuffer.Bytes()) + case 0x0002: // Time + // Append full frame + packetBuffer.Write(frameBuffer.Bytes()) - // Send out the packet - rtpConn.Write(packetBuffer.Bytes()) + // Push to RTSP server (in-memory) + relay.Output <- Frame{Payload: append([]byte{}, packetBuffer.Bytes()...)} - // Prepare the next packet - packetBuffer.Reset() - packetBuffer.Write([]byte{0x80, 0x63}) - binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1) - binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90) - binary.Write(&packetBuffer, binary.BigEndian, (uint64(0))) + // Reset the next packet + packetBuffer.Reset() + packetBuffer.Write([]byte{0x80, 0x63}) + binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1) + binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90) + binary.Write(&packetBuffer, binary.BigEndian, (uint64(0))) - // Reset the Framebuffer - frameBuffer.Reset() - sequenceNumber++ + frameBuffer.Reset() + sequenceNumber++ + elapsed = binary.LittleEndian.Uint32(payload[12:]) - elapsed = binary.LittleEndian.Uint32(payload[12:]) - default: - log.Printf("Received Unknown Message: %+v\n", header) - log.Printf("Payload:\n%s\n", hex.Dump(payload)) - } + default: + log.Printf("Received Unknown Message: %+v\n", header) + log.Printf("Payload:\n%s\n", hex.Dump(payload)) } } + } + + close(relay.Output) } // Stop stops listening for packets func (r *RTPRelay) Stop() { - close = true + closeFlag = true r.close = true r.listener.Close() } diff --git a/rtsp/RTSPServer.go b/rtsp/RTSPServer.go index 349c987..b3daf28 100644 --- a/rtsp/RTSPServer.go +++ b/rtsp/RTSPServer.go @@ -1,252 +1,139 @@ package rtsp import ( - "context" - "fmt" - "log" - "net" - "sync" - "time" + "context" + "fmt" + "log" + "sync" - "github.com/pion/rtp" + "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4" - "github.com/bluenviron/gortsplib/v4/pkg/base" - "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" - - // We keep this import so the CreateServer() signature matches the original - // repo (actioncam.go calls rtsp.CreateServer(ctx, host, port, camera)). - "github.com/jonas-koeritz/actioncam/libipcamera" + "github.com/jonas-koeritz/actioncam/libipcamera" ) -// This is where your RTP relay should send H.264 packets. -// The original project uses port 5220 for the preview stream. -const defaultRTPInPort = 5220 - -// Server wraps the gortsplib server + the in-memory stream. type Server struct { - gs *gortsplib.Server - stream *gortsplib.ServerStream - media *description.Media - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - rtpConn *net.UDPConn + stream *gortsplib.ServerStream + media *description.Media + gs *gortsplib.Server + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } -// handler implements the gortsplib ServerHandler* interfaces. -type handler struct { - srv *Server +// CreateServer now requires relay as input +func CreateServer(parentCtx context.Context, host string, port int, relay *libipcamera.RTPRelay) (*Server, error) { + ctx, cancel := context.WithCancel(parentCtx) + + stream, media, err := newH264Stream() + if err != nil { + cancel() + return nil, fmt.Errorf("create H264 stream: %w", err) + } + + srv := &Server{ + ctx: ctx, + cancel: cancel, + stream: stream, + media: media, + } + + h := &handler{srv: srv} + + gs := &gortsplib.Server{ + Handler: h, + RTSPAddress: fmt.Sprintf("%s:%d", host, port), + } + srv.gs = gs + + // 🧠 Pump from relay to RTSP + srv.wg.Add(1) + go srv.relayPump(relay) + + go func() { + log.Printf("RTSP server ready on rtsp://%s:%d/", host, port) + err := gs.StartAndWait() + if err != nil { + log.Printf("RTSP stopped: %v", err) + } + cancel() + }() + + return srv, nil } -// --- RTSP callbacks (multi-client capable) --- - -// OnDescribe: clients ask what the stream looks like (SDP). -func (h *handler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("RTSP DESCRIBE from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path) - - return &base.Response{ - StatusCode: base.StatusOK, - }, h.srv.stream, nil -} - -// OnSetup: client wants to SUBSCRIBE to the existing stream. -func (h *handler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { - log.Printf("RTSP SETUP from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path) - - return &base.Response{ - StatusCode: base.StatusOK, - }, h.srv.stream, nil -} - -// OnPlay: client starts receiving packets. -func (h *handler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { - log.Printf("RTSP PLAY from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path) - - return &base.Response{ - StatusCode: base.StatusOK, - }, nil -} - -// (Optional) For logging / debugging: -func (h *handler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { - log.Printf("RTSP connection opened: %v", ctx.Conn.NetConn().RemoteAddr()) -} - -func (h *handler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { - log.Printf("RTSP connection closed: %v (err=%v)", ctx.Conn.NetConn().RemoteAddr(), ctx.Error) -} - -// --- Public API --- - -// CreateServer creates and starts a RTSP server that serves *one* H.264 video -// stream, backed by a single UDP source (127.0.0.1:defaultRTPInPort). -// -// It keeps the original function signature from the repo: -// rtsp.CreateServer(applicationContext, host, port, camera) -// but the camera is NOT used directly here – you’re expected to start the -// RTP relay separately so that it forwards H.264 RTP packets into -// 127.0.0.1:defaultRTPInPort. -// -// Multiple RTSP clients are automatically supported: all of them read from -// the same gortsplib.ServerStream. -func CreateServer(parentCtx context.Context, host string, port int, _ *libipcamera.Camera) (*Server, error) { - ctx, cancel := context.WithCancel(parentCtx) - - // Build an in-memory H.264 stream description suitable for gortsplib. - stream, media, err := newH264Stream() - if err != nil { - cancel() - return nil, fmt.Errorf("create H264 stream: %w", err) - } - - // Listen for incoming RTP packets from the RTP relay. - rtpAddr := &net.UDPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: defaultRTPInPort, - } - - rtpConn, err := net.ListenUDP("udp", rtpAddr) - if err != nil { - cancel() - return nil, fmt.Errorf("listen udp %v: %w", rtpAddr, err) - } - - srv := &Server{ - stream: stream, - media: media, - ctx: ctx, - cancel: cancel, - rtpConn: rtpConn, - } - - h := &handler{srv: srv} - - // Configure gortsplib server. - gs := &gortsplib.Server{ - Handler: h, - RTSPAddress: fmt.Sprintf("%s:%d", host, port), - // If you also want UDP transport for clients, set UDPRTPAddress/UDPRTCPAddress here. - // For most use cases, TCP (interleaved over RTSP) is fine. - } - srv.gs = gs - - // Start the UDP → RTSP pump. - srv.wg.Add(1) - go srv.rtpPump() - - // Start RTSP server in the background. - go func() { - log.Printf("RTSP server listening on rtsp://%s:%d/ (gortsplib)", host, port) - - if err := gs.StartAndWait(); err != nil { - log.Printf("RTSP server stopped with error: %v", err) - } - cancel() - }() - - return srv, nil -} - -// Close shuts down the RTSP server and the RTP pump. func (s *Server) Close() { - s.cancel() - if s.gs != nil { - s.gs.Close() - } - if s.rtpConn != nil { - _ = s.rtpConn.Close() - } - s.wg.Wait() - if s.stream != nil { - s.stream.Close() - } + s.cancel() + if s.gs != nil { + s.gs.Close() + } + s.wg.Wait() + if s.stream != nil { + s.stream.Close() + } } -// --- internals --- +// Relay pump: consumes RTP from channel +func (s *Server) relayPump(relay *libipcamera.RTPRelay) { + defer s.wg.Done() + for { + select { + case <-s.ctx.Done(): + return + case frame, ok := <-relay.Output: + if !ok { + return + } + // Build RTP packet + var pkt rtp.Packet + if err := pkt.Unmarshal(frame.Payload); err != nil { + continue + } + s.stream.WritePacketRTP(s.media, &pkt) + } + } +} -// newH264Stream builds a single-video-track ServerStream with a valid clock rate. -// -// NOTE: -// * SPS / PPS here are "generic valid" values, not tuned to your camera. -// * For best results, you can parse the real SPS/PPS from your camera.sdp and -// drop them in here. +// Handler below… +type handler struct{ srv *Server } + +func (h *handler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + return &base.Response{StatusCode: base.StatusOK}, h.srv.stream, nil +} + +func (h *handler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + return &base.Response{StatusCode: base.StatusOK}, h.srv.stream, nil +} + +func (h *handler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + return &base.Response{StatusCode: base.StatusOK}, nil +} + +// Build simple baseline H264 SDP func newH264Stream() (*gortsplib.ServerStream, *description.Media, error) { - // Generic baseline-profile SPS/PPS. They just need to be *valid* so that - // the library knows the clock rate and doesn't panic ("non-positive interval - // for NewTicker") :contentReference[oaicite:1]{index=1} - h264 := &format.H264{ - PayloadTyp: 96, - SPS: []byte{ - 0x67, 0x42, 0xC0, 0x1F, 0x96, 0x54, 0x05, 0x01, 0xED, 0x00, 0xF0, 0x88, 0x45, 0x80, - }, - PPS: []byte{ - 0x68, 0xCE, 0x38, 0x80, - }, - PacketizationMode: 1, - } + h264 := &format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + SPS: []byte{0x67, 0x42, 0xC0, 0x1F, 0x96, 0x54, 0x05, 0x01, 0xED, 0x00, 0xF0, 0x88, 0x45, 0x80}, + PPS: []byte{0x68, 0xCE, 0x38, 0x80}, + } - media := &description.Media{ - Type: description.MediaTypeVideo, - // You could also set media.Control if you want a specific track URL. - Formats: []format.Format{h264}, - } + media := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{h264}, + } - desc := &description.Session{ - Medias: []*description.Media{media}, - } + desc := &description.Session{ + Medias: []*description.Media{media}, + } - stream := &gortsplib.ServerStream{ - Desc: desc, - } - if err := stream.Initialize(); err != nil { - return nil, nil, err - } + stream := &gortsplib.ServerStream{Desc: desc} + if err := stream.Initialize(); err != nil { + return nil, nil, err + } - return stream, media, nil -} - -// rtpPump reads RTP packets from UDP and pushes them into the gortsplib stream. -// Every connected RTSP client gets the same packets (multi-client fan-out). -func (s *Server) rtpPump() { - defer s.wg.Done() - - buf := make([]byte, 2048) - - for { - select { - case <-s.ctx.Done(): - return - default: - } - - // Avoid blocking forever so we can react to shutdown. - _ = s.rtpConn.SetReadDeadline(time.Now().Add(2 * time.Second)) - - n, _, err := s.rtpConn.ReadFromUDP(buf) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Timeout() { - continue - } - // If the context is done, exit quietly. - if s.ctx.Err() != nil { - return - } - - log.Printf("RTP read error: %v", err) - continue - } - - var pkt rtp.Packet - if err := pkt.Unmarshal(buf[:n]); err != nil { - // Ignore malformed packets. - continue - } - - if err := s.stream.WritePacketRTP(s.media, &pkt); err != nil { - // This is non-fatal; clients can disconnect at any time. - log.Printf("WritePacketRTP error: %v", err) - } - } + return stream, media, nil }