five
This commit is contained in:
parent
7eec0f88e4
commit
403dfc5092
5 changed files with 179 additions and 262 deletions
|
|
@ -4,136 +4,101 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Frame is a decoded RTP packet buffer forwarded to the RTSP server
|
||||
type Frame struct {
|
||||
Payload []byte
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// RTPRelay holds information on the relaying stream listener
|
||||
type RTPRelay struct {
|
||||
close bool
|
||||
close bool
|
||||
listener net.PacketConn
|
||||
context context.Context
|
||||
Output chan Frame // 🚨 NEW: channel to push decoded RTP frames
|
||||
Context context.Context
|
||||
Frames chan Frame
|
||||
}
|
||||
|
||||
var closeFlag bool
|
||||
|
||||
// CreateRTPRelay creates a UDP listener that handles live data
|
||||
// and pushes RTP-ready payloads into Output channel.
|
||||
func CreateRTPRelay(ctx context.Context) *RTPRelay {
|
||||
conn, err := net.ListenPacket("udp", ":6669")
|
||||
|
||||
if err != nil {
|
||||
log.Printf("ERROR: %s\n", err)
|
||||
}
|
||||
|
||||
closeFlag = false
|
||||
relay := RTPRelay{
|
||||
relay := &RTPRelay{
|
||||
close: false,
|
||||
listener: conn,
|
||||
context: ctx,
|
||||
Output: make(chan Frame, 100), // buffered for smoother streaming
|
||||
Context: ctx,
|
||||
Frames: make(chan Frame, 30),
|
||||
}
|
||||
|
||||
go handleCameraStream(relay, conn)
|
||||
go relay.readLoop()
|
||||
|
||||
return &relay
|
||||
return relay
|
||||
}
|
||||
|
||||
func handleCameraStream(relay RTPRelay, conn net.PacketConn) {
|
||||
buffer := make([]byte, 2048)
|
||||
packetReader := bytes.NewReader(buffer)
|
||||
|
||||
func (r *RTPRelay) readLoop() {
|
||||
buf := make([]byte, 2048)
|
||||
packetReader := bytes.NewReader(buf)
|
||||
header := streamHeader{}
|
||||
var payload []byte
|
||||
|
||||
var sequenceNumber uint16
|
||||
var elapsed uint32
|
||||
|
||||
frameBuffer := bytes.Buffer{}
|
||||
packetBuffer := bytes.Buffer{}
|
||||
|
||||
T:
|
||||
for {
|
||||
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
r.listener.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
|
||||
select {
|
||||
case <-relay.context.Done():
|
||||
log.Println("Context Done")
|
||||
relay.listener.Close()
|
||||
close(relay.Output)
|
||||
break T
|
||||
case <-r.Context.Done():
|
||||
close(r.Frames)
|
||||
return
|
||||
default:
|
||||
if closeFlag {
|
||||
relay.listener.Close()
|
||||
close(relay.Output)
|
||||
break T
|
||||
}
|
||||
|
||||
_, _, err := r.listener.ReadFrom(buf)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
packetReader.Reset(buf)
|
||||
|
||||
binary.Read(packetReader, binary.BigEndian, &header)
|
||||
|
||||
if header.Magic != 0xBCDE {
|
||||
continue
|
||||
}
|
||||
|
||||
if header.Length > 0 {
|
||||
payload = make([]byte, header.Length)
|
||||
_, err := io.ReadFull(packetReader, payload)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
payload = []byte{}
|
||||
}
|
||||
|
||||
conn.ReadFrom(buffer)
|
||||
packetReader.Reset(buffer)
|
||||
|
||||
binary.Read(packetReader, binary.BigEndian, &header)
|
||||
|
||||
if header.Magic != 0xBCDE {
|
||||
log.Printf("Received message with invalid magic (%x).", header.Magic)
|
||||
break
|
||||
}
|
||||
|
||||
if header.Length > 0 {
|
||||
payload = make([]byte, header.Length)
|
||||
_, err := io.ReadFull(packetReader, payload)
|
||||
if err != nil {
|
||||
log.Printf("Read Error: %s\n", err)
|
||||
break
|
||||
}
|
||||
} else {
|
||||
payload = []byte{}
|
||||
}
|
||||
|
||||
switch header.MessageType {
|
||||
case 0x0001: // H.264 Data
|
||||
frameBuffer.Write(payload)
|
||||
|
||||
case 0x0002: // Time
|
||||
// Append full frame
|
||||
packetBuffer.Write(frameBuffer.Bytes())
|
||||
|
||||
// Push to RTSP server (in-memory)
|
||||
relay.Output <- Frame{Payload: append([]byte{}, packetBuffer.Bytes()...)}
|
||||
|
||||
// Reset the next packet
|
||||
packetBuffer.Reset()
|
||||
packetBuffer.Write([]byte{0x80, 0x63})
|
||||
binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1)
|
||||
binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90)
|
||||
binary.Write(&packetBuffer, binary.BigEndian, (uint64(0)))
|
||||
|
||||
switch header.MessageType {
|
||||
case 0x0001:
|
||||
frameBuffer.Write(payload)
|
||||
case 0x0002:
|
||||
// Emit a full H264 frame
|
||||
if frameBuffer.Len() > 0 {
|
||||
cp := append([]byte{}, frameBuffer.Bytes()...)
|
||||
r.Frames <- Frame{Data: cp}
|
||||
frameBuffer.Reset()
|
||||
sequenceNumber++
|
||||
elapsed = binary.LittleEndian.Uint32(payload[12:])
|
||||
|
||||
default:
|
||||
log.Printf("Received Unknown Message: %+v\n", header)
|
||||
log.Printf("Payload:\n%s\n", hex.Dump(payload))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(relay.Output)
|
||||
}
|
||||
|
||||
// Stop stops listening for packets
|
||||
func (r *RTPRelay) Stop() {
|
||||
closeFlag = true
|
||||
r.close = true
|
||||
r.listener.Close()
|
||||
close(r.Frames)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue