package rtsp import ( "bytes" "context" "log" "strconv" "sync" "time" "github.com/bluenviron/gortsplib/v5" "github.com/bluenviron/gortsplib/v5/pkg/base" "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/pion/rtp" "github.com/jonas-koeritz/actioncam/libipcamera" ) type Handler struct { server *gortsplib.Server stream *gortsplib.ServerStream mu sync.RWMutex } func (h *Handler) OnDescribe(_ *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { h.mu.RLock() defer h.mu.RUnlock() return &base.Response{StatusCode: base.StatusOK}, h.stream, nil } func (h *Handler) OnSetup(_ *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { h.mu.RLock() defer h.mu.RUnlock() return &base.Response{StatusCode: base.StatusOK}, h.stream, nil } func (h *Handler) OnPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{StatusCode: base.StatusOK}, nil } type Server struct { server *gortsplib.Server stream *gortsplib.ServerStream cancel context.CancelFunc wg sync.WaitGroup } func CreateServer(ctx context.Context, ip string, port int, relay *libipcamera.RTPRelay) *Server { cctx, cancel := context.WithCancel(ctx) h := &Handler{} srv := &Server{cancel: cancel} // create RTSP server h.server = &gortsplib.Server{ Handler: h, RTSPAddress: ip + ":" + strconv.Itoa(port), UDPRTPAddress: ":8000", UDPRTCPAddress: ":8001", } if err := h.server.Start(); err != nil { panic(err) } // create SDP + stream h.mu.Lock() desc := &description.Session{ Medias: []*description.Media{{ Type: description.MediaTypeVideo, Formats: []format.Format{&format.H264{ PayloadTyp: 96, PacketizationMode: 1, }}, }}, } h.stream = &gortsplib.ServerStream{Server: h.server, Desc: desc} if err := h.stream.Initialize(); err != nil { panic(err) } srv.server = h.server srv.stream = h.stream h.mu.Unlock() // start streaming goroutine srv.startPump(cctx, relay, h) log.Printf("RTSP server ready: rtsp://%s:%d/", ip, port) return srv } // ---- STREAMING / H264 → RTP ---- func (s *Server) startPump(cctx context.Context, relay *libipcamera.RTPRelay, h *Handler) { s.wg.Add(1) go func() { defer s.wg.Done() var seq uint16 var lastTS uint32 var sps, pps []byte for { select { case <-cctx.Done(): return case frame, ok := <-relay.Frames: if !ok { return } var ts = frame.Elapsed * 90 if lastTS != 0 && frame.Elapsed > lastTS { delta := frame.Elapsed - lastTS time.Sleep(time.Duration(delta) * time.Millisecond) } lastTS = frame.Elapsed nalus := splitNALUnits(frame.Data) for _, nalu := range nalus { if len(nalu) < 1 { continue } nalType := nalu[0] & 0x1F switch nalType { case 7: // SPS sps = append([]byte{}, nalu...) continue case 8: // PPS pps = append([]byte{}, nalu...) continue } if nalType == 5 && sps != nil && pps != nil { sendNALUnit(h, s.stream, sps, &seq, &ts) sendNALUnit(h, s.stream, pps, &seq, &ts) } sendNALUnit(h, s.stream, nalu, &seq, &ts) } } } }() } func splitNALUnits(frame []byte) [][]byte { var nalus [][]byte for { idx := bytes.Index(frame, []byte{0, 0, 0, 1}) if idx == -1 { if len(frame) > 0 { nalus = append(nalus, frame) } return nalus } if idx != 0 { nalus = append(nalus, frame[:idx]) } frame = frame[idx+4:] } } func sendNALUnit(h *Handler, stream *gortsplib.ServerStream, nalu []byte, seq *uint16, ts *uint32) { const maxPayload = 1200 if len(nalu) <= maxPayload { writeRTP(h, stream, nalu, true, seq, ts) return } // FU-A fragmentation first := true naluHeader := nalu[0] payload := nalu[1:] for len(payload) > 0 { size := maxPayload if len(payload) < size { size = len(payload) } fuHeader := byte(28) | (naluHeader & 0x60) startFlag := byte(0x80) endFlag := byte(0x40) b := []byte{fuHeader, 0} b[1] = naluHeader & 0x1F if first { b[1] |= startFlag first = false } else if len(payload) <= size { b[1] |= endFlag } packet := append(b, payload[:size]...) writeRTP(h, stream, packet, len(payload) <= size, seq, ts) payload = payload[size:] } } func writeRTP(h *Handler, stream *gortsplib.ServerStream, payload []byte, marker bool, seq *uint16, ts *uint32) { pkt := &rtp.Packet{ Header: rtp.Header{ Version: 2, Marker: marker, PayloadType: 96, SequenceNumber: *seq, Timestamp: *ts, }, Payload: payload, } stream.WritePacketRTP(stream.Desc.Medias[0], pkt) *seq++ } // ---- SHUTDOWN ---- func (s *Server) Stop() { s.cancel() s.server.Close() s.stream.Close() s.wg.Wait() }