second forcepush this away if i fail

This commit is contained in:
Soph :3 2025-11-24 20:41:43 +02:00
parent d1f1aae0d3
commit 9e5cd110e0
3 changed files with 201 additions and 316 deletions

View file

@ -55,7 +55,7 @@ func main() {
Args: cobra.MaximumNArgs(1), Args: cobra.MaximumNArgs(1),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
defer camera.Disconnect() defer camera.Disconnect()
relay := libipcamera.CreateRTPRelay(applicationContext, net.ParseIP("127.0.0.1"), 5220) relay := libipcamera.CreateRTPRelay(applicationContext)
defer relay.Stop() defer relay.Stop()
camera.StartPreviewStream() camera.StartPreviewStream()
@ -271,10 +271,15 @@ func main() {
if(host == "") { if(host == "") {
host = "127.0.0.1" host = "127.0.0.1"
} }
relay := libipcamera.CreateRTPRelay(applicationContext)
rtspServer, err := rtsp.CreateServer(applicationContext, host, port, camera) rtspServer, err := rtsp.CreateServer(applicationContext, host, port, relay)
defer rtspServer.Close() defer rtspServer.Close()
if err := camera.StartPreviewStream(); err != nil {
log.Printf("ERROR preview: %v", err)
return
}
log.Printf("Created RTSP Server\n") log.Printf("Created RTSP Server\n")
if err != nil { if err != nil {

View file

@ -11,36 +11,36 @@ import (
"time" "time"
) )
// Frame is a decoded RTP packet buffer forwarded to the RTSP server
type Frame struct {
Payload []byte
}
// RTPRelay holds information on the relaying stream listener // RTPRelay holds information on the relaying stream listener
type RTPRelay struct { type RTPRelay struct {
close bool close bool
targetIP net.IP
targetPort int
listener net.PacketConn listener net.PacketConn
context context.Context context context.Context
Output chan Frame // 🚨 NEW: channel to push decoded RTP frames
} }
var close bool var closeFlag bool
// CreateRTPRelay creates a UDP listener that handles live data // CreateRTPRelay creates a UDP listener that handles live data
// from the camera and forwards it as an RTP stream // and pushes RTP-ready payloads into Output channel.
func CreateRTPRelay(ctx context.Context, targetAddress net.IP, targetPort int) *RTPRelay { func CreateRTPRelay(ctx context.Context) *RTPRelay {
conn, err := net.ListenPacket("udp", ":6669") conn, err := net.ListenPacket("udp", ":6669")
if err != nil { if err != nil {
log.Printf("ERROR: %s\n", err) log.Printf("ERROR: %s\n", err)
} }
close = false closeFlag = false
relay := RTPRelay{ relay := RTPRelay{
close: false, close: false,
targetIP: targetAddress,
targetPort: targetPort,
listener: conn, listener: conn,
context: ctx, context: ctx,
} Output: make(chan Frame, 100), // buffered for smoother streaming
if err != nil {
log.Printf("ERROR: %s\n", err)
} }
go handleCameraStream(relay, conn) go handleCameraStream(relay, conn)
@ -55,21 +55,12 @@ func handleCameraStream(relay RTPRelay, conn net.PacketConn) {
header := streamHeader{} header := streamHeader{}
var payload []byte var payload []byte
rtpTarget := net.UDPAddr{
IP: relay.targetIP,
Port: relay.targetPort,
}
rtpSource, _ := net.ResolveUDPAddr("udp", "127.0.0.1")
rtpConn, err := net.DialUDP("udp", rtpSource, &rtpTarget)
if err != nil {
log.Printf("ERROR creating RTP sender: %s\n", err)
}
var sequenceNumber uint16 var sequenceNumber uint16
var elapsed uint32 var elapsed uint32
frameBuffer := bytes.Buffer{} frameBuffer := bytes.Buffer{}
packetBuffer := bytes.Buffer{} packetBuffer := bytes.Buffer{}
T: T:
for { for {
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) conn.SetReadDeadline(time.Now().Add(10 * time.Second))
@ -77,13 +68,13 @@ func handleCameraStream(relay RTPRelay, conn net.PacketConn) {
select { select {
case <-relay.context.Done(): case <-relay.context.Done():
log.Println("Context Done") log.Println("Context Done")
rtpConn.Close()
relay.listener.Close() relay.listener.Close()
close(relay.Output)
break T break T
default: default:
if close { if closeFlag {
rtpConn.Close()
relay.listener.Close() relay.listener.Close()
close(relay.Output)
break T break T
} }
@ -111,36 +102,38 @@ func handleCameraStream(relay RTPRelay, conn net.PacketConn) {
switch header.MessageType { switch header.MessageType {
case 0x0001: // H.264 Data case 0x0001: // H.264 Data
frameBuffer.Write(payload) frameBuffer.Write(payload)
case 0x0002: // Time case 0x0002: // Time
// Append the Framebuffer // Append full frame
packetBuffer.Write(frameBuffer.Bytes()) packetBuffer.Write(frameBuffer.Bytes())
// Send out the packet // Push to RTSP server (in-memory)
rtpConn.Write(packetBuffer.Bytes()) relay.Output <- Frame{Payload: append([]byte{}, packetBuffer.Bytes()...)}
// Prepare the next packet // Reset the next packet
packetBuffer.Reset() packetBuffer.Reset()
packetBuffer.Write([]byte{0x80, 0x63}) packetBuffer.Write([]byte{0x80, 0x63})
binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1) binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1)
binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90) binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90)
binary.Write(&packetBuffer, binary.BigEndian, (uint64(0))) binary.Write(&packetBuffer, binary.BigEndian, (uint64(0)))
// Reset the Framebuffer
frameBuffer.Reset() frameBuffer.Reset()
sequenceNumber++ sequenceNumber++
elapsed = binary.LittleEndian.Uint32(payload[12:]) elapsed = binary.LittleEndian.Uint32(payload[12:])
default: default:
log.Printf("Received Unknown Message: %+v\n", header) log.Printf("Received Unknown Message: %+v\n", header)
log.Printf("Payload:\n%s\n", hex.Dump(payload)) log.Printf("Payload:\n%s\n", hex.Dump(payload))
} }
} }
} }
close(relay.Output)
} }
// Stop stops listening for packets // Stop stops listening for packets
func (r *RTPRelay) Stop() { func (r *RTPRelay) Stop() {
close = true closeFlag = true
r.close = true r.close = true
r.listener.Close() r.listener.Close()
} }

View file

@ -4,144 +4,60 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"net"
"sync" "sync"
"time"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
// We keep this import so the CreateServer() signature matches the original
// repo (actioncam.go calls rtsp.CreateServer(ctx, host, port, camera)).
"github.com/jonas-koeritz/actioncam/libipcamera" "github.com/jonas-koeritz/actioncam/libipcamera"
) )
// This is where your RTP relay should send H.264 packets.
// The original project uses port 5220 for the preview stream.
const defaultRTPInPort = 5220
// Server wraps the gortsplib server + the in-memory stream.
type Server struct { type Server struct {
gs *gortsplib.Server
stream *gortsplib.ServerStream stream *gortsplib.ServerStream
media *description.Media media *description.Media
gs *gortsplib.Server
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
rtpConn *net.UDPConn
} }
// handler implements the gortsplib ServerHandler* interfaces. // CreateServer now requires relay as input
type handler struct { func CreateServer(parentCtx context.Context, host string, port int, relay *libipcamera.RTPRelay) (*Server, error) {
srv *Server
}
// --- RTSP callbacks (multi-client capable) ---
// OnDescribe: clients ask what the stream looks like (SDP).
func (h *handler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("RTSP DESCRIBE from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
}, h.srv.stream, nil
}
// OnSetup: client wants to SUBSCRIBE to the existing stream.
func (h *handler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("RTSP SETUP from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
}, h.srv.stream, nil
}
// OnPlay: client starts receiving packets.
func (h *handler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
log.Printf("RTSP PLAY from %v path=%s", ctx.Conn.NetConn().RemoteAddr(), ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
// (Optional) For logging / debugging:
func (h *handler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
log.Printf("RTSP connection opened: %v", ctx.Conn.NetConn().RemoteAddr())
}
func (h *handler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
log.Printf("RTSP connection closed: %v (err=%v)", ctx.Conn.NetConn().RemoteAddr(), ctx.Error)
}
// --- Public API ---
// CreateServer creates and starts a RTSP server that serves *one* H.264 video
// stream, backed by a single UDP source (127.0.0.1:defaultRTPInPort).
//
// It keeps the original function signature from the repo:
// rtsp.CreateServer(applicationContext, host, port, camera)
// but the camera is NOT used directly here youre expected to start the
// RTP relay separately so that it forwards H.264 RTP packets into
// 127.0.0.1:defaultRTPInPort.
//
// Multiple RTSP clients are automatically supported: all of them read from
// the same gortsplib.ServerStream.
func CreateServer(parentCtx context.Context, host string, port int, _ *libipcamera.Camera) (*Server, error) {
ctx, cancel := context.WithCancel(parentCtx) ctx, cancel := context.WithCancel(parentCtx)
// Build an in-memory H.264 stream description suitable for gortsplib.
stream, media, err := newH264Stream() stream, media, err := newH264Stream()
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("create H264 stream: %w", err) return nil, fmt.Errorf("create H264 stream: %w", err)
} }
// Listen for incoming RTP packets from the RTP relay.
rtpAddr := &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: defaultRTPInPort,
}
rtpConn, err := net.ListenUDP("udp", rtpAddr)
if err != nil {
cancel()
return nil, fmt.Errorf("listen udp %v: %w", rtpAddr, err)
}
srv := &Server{ srv := &Server{
stream: stream,
media: media,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
rtpConn: rtpConn, stream: stream,
media: media,
} }
h := &handler{srv: srv} h := &handler{srv: srv}
// Configure gortsplib server.
gs := &gortsplib.Server{ gs := &gortsplib.Server{
Handler: h, Handler: h,
RTSPAddress: fmt.Sprintf("%s:%d", host, port), RTSPAddress: fmt.Sprintf("%s:%d", host, port),
// If you also want UDP transport for clients, set UDPRTPAddress/UDPRTCPAddress here.
// For most use cases, TCP (interleaved over RTSP) is fine.
} }
srv.gs = gs srv.gs = gs
// Start the UDP → RTSP pump. // 🧠 Pump from relay to RTSP
srv.wg.Add(1) srv.wg.Add(1)
go srv.rtpPump() go srv.relayPump(relay)
// Start RTSP server in the background.
go func() { go func() {
log.Printf("RTSP server listening on rtsp://%s:%d/ (gortsplib)", host, port) log.Printf("RTSP server ready on rtsp://%s:%d/", host, port)
err := gs.StartAndWait()
if err := gs.StartAndWait(); err != nil { if err != nil {
log.Printf("RTSP server stopped with error: %v", err) log.Printf("RTSP stopped: %v", err)
} }
cancel() cancel()
}() }()
@ -149,47 +65,64 @@ func CreateServer(parentCtx context.Context, host string, port int, _ *libipcame
return srv, nil return srv, nil
} }
// Close shuts down the RTSP server and the RTP pump.
func (s *Server) Close() { func (s *Server) Close() {
s.cancel() s.cancel()
if s.gs != nil { if s.gs != nil {
s.gs.Close() s.gs.Close()
} }
if s.rtpConn != nil {
_ = s.rtpConn.Close()
}
s.wg.Wait() s.wg.Wait()
if s.stream != nil { if s.stream != nil {
s.stream.Close() s.stream.Close()
} }
} }
// --- internals --- // Relay pump: consumes RTP from channel
func (s *Server) relayPump(relay *libipcamera.RTPRelay) {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
return
case frame, ok := <-relay.Output:
if !ok {
return
}
// Build RTP packet
var pkt rtp.Packet
if err := pkt.Unmarshal(frame.Payload); err != nil {
continue
}
s.stream.WritePacketRTP(s.media, &pkt)
}
}
}
// newH264Stream builds a single-video-track ServerStream with a valid clock rate. // Handler below…
// type handler struct{ srv *Server }
// NOTE:
// * SPS / PPS here are "generic valid" values, not tuned to your camera. func (h *handler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
// * For best results, you can parse the real SPS/PPS from your camera.sdp and return &base.Response{StatusCode: base.StatusOK}, h.srv.stream, nil
// drop them in here. }
func (h *handler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{StatusCode: base.StatusOK}, h.srv.stream, nil
}
func (h *handler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{StatusCode: base.StatusOK}, nil
}
// Build simple baseline H264 SDP
func newH264Stream() (*gortsplib.ServerStream, *description.Media, error) { func newH264Stream() (*gortsplib.ServerStream, *description.Media, error) {
// Generic baseline-profile SPS/PPS. They just need to be *valid* so that
// the library knows the clock rate and doesn't panic ("non-positive interval
// for NewTicker") :contentReference[oaicite:1]{index=1}
h264 := &format.H264{ h264 := &format.H264{
PayloadTyp: 96, PayloadTyp: 96,
SPS: []byte{
0x67, 0x42, 0xC0, 0x1F, 0x96, 0x54, 0x05, 0x01, 0xED, 0x00, 0xF0, 0x88, 0x45, 0x80,
},
PPS: []byte{
0x68, 0xCE, 0x38, 0x80,
},
PacketizationMode: 1, PacketizationMode: 1,
SPS: []byte{0x67, 0x42, 0xC0, 0x1F, 0x96, 0x54, 0x05, 0x01, 0xED, 0x00, 0xF0, 0x88, 0x45, 0x80},
PPS: []byte{0x68, 0xCE, 0x38, 0x80},
} }
media := &description.Media{ media := &description.Media{
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
// You could also set media.Control if you want a specific track URL.
Formats: []format.Format{h264}, Formats: []format.Format{h264},
} }
@ -197,56 +130,10 @@ func newH264Stream() (*gortsplib.ServerStream, *description.Media, error) {
Medias: []*description.Media{media}, Medias: []*description.Media{media},
} }
stream := &gortsplib.ServerStream{ stream := &gortsplib.ServerStream{Desc: desc}
Desc: desc,
}
if err := stream.Initialize(); err != nil { if err := stream.Initialize(); err != nil {
return nil, nil, err return nil, nil, err
} }
return stream, media, nil return stream, media, nil
} }
// rtpPump reads RTP packets from UDP and pushes them into the gortsplib stream.
// Every connected RTSP client gets the same packets (multi-client fan-out).
func (s *Server) rtpPump() {
defer s.wg.Done()
buf := make([]byte, 2048)
for {
select {
case <-s.ctx.Done():
return
default:
}
// Avoid blocking forever so we can react to shutdown.
_ = s.rtpConn.SetReadDeadline(time.Now().Add(2 * time.Second))
n, _, err := s.rtpConn.ReadFromUDP(buf)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
}
// If the context is done, exit quietly.
if s.ctx.Err() != nil {
return
}
log.Printf("RTP read error: %v", err)
continue
}
var pkt rtp.Packet
if err := pkt.Unmarshal(buf[:n]); err != nil {
// Ignore malformed packets.
continue
}
if err := s.stream.WritePacketRTP(s.media, &pkt); err != nil {
// This is non-fatal; clients can disconnect at any time.
log.Printf("WritePacketRTP error: %v", err)
}
}
}