Fixes with RTPRelay and actioncam

Endless loop in RTPRelay and exiting from whole app using Ctrl+C
This commit is contained in:
fildaw 2020-01-05 15:53:45 +01:00 committed by GitHub
parent e82f5a97c9
commit 205c9896dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 55 deletions

View file

@ -71,6 +71,7 @@ func main() {
case sig := <-signalChannel: case sig := <-signalChannel:
log.Printf("Got signal %s, exiting...\n", sig) log.Printf("Got signal %s, exiting...\n", sig)
cancel() cancel()
os.Exit(0)
} }
}(cancel) }(cancel)

View file

@ -20,6 +20,8 @@ type RTPRelay struct {
context context.Context context context.Context
} }
var close bool
// CreateRTPRelay creates a UDP listener that handles live data // CreateRTPRelay creates a UDP listener that handles live data
// from the camera and forwards it as an RTP stream // from the camera and forwards it as an RTP stream
func CreateRTPRelay(ctx context.Context, targetAddress net.IP, targetPort int) *RTPRelay { func CreateRTPRelay(ctx context.Context, targetAddress net.IP, targetPort int) *RTPRelay {
@ -29,7 +31,9 @@ func CreateRTPRelay(ctx context.Context, targetAddress net.IP, targetPort int) *
log.Printf("ERROR: %s\n", err) log.Printf("ERROR: %s\n", err)
} }
close = false
relay := RTPRelay{ relay := RTPRelay{
close: false,
targetIP: targetAddress, targetIP: targetAddress,
targetPort: targetPort, targetPort: targetPort,
listener: conn, listener: conn,
@ -66,76 +70,77 @@ func handleCameraStream(relay RTPRelay, conn net.PacketConn) {
frameBuffer := bytes.Buffer{} frameBuffer := bytes.Buffer{}
packetBuffer := bytes.Buffer{} packetBuffer := bytes.Buffer{}
T:
for {
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
for { select {
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) case <-relay.context.Done():
log.Println("Context Done")
select {
case <-relay.context.Done():
log.Println("Context Done")
rtpConn.Close()
relay.listener.Close()
break
default:
if relay.close {
rtpConn.Close() rtpConn.Close()
relay.listener.Close() relay.listener.Close()
break break T
} default:
if close {
rtpConn.Close()
relay.listener.Close()
break T
}
conn.ReadFrom(buffer) conn.ReadFrom(buffer)
packetReader.Reset(buffer) packetReader.Reset(buffer)
binary.Read(packetReader, binary.BigEndian, &header) binary.Read(packetReader, binary.BigEndian, &header)
if header.Magic != 0xBCDE { if header.Magic != 0xBCDE {
log.Printf("Received message with invalid magic (%x).", header.Magic) log.Printf("Received message with invalid magic (%x).", header.Magic)
break
}
if header.Length > 0 {
payload = make([]byte, header.Length)
_, err := io.ReadFull(packetReader, payload)
if err != nil {
log.Printf("Read Error: %s\n", err)
break break
} }
} else {
payload = []byte{}
}
switch header.MessageType { if header.Length > 0 {
case 0x0001: // H.264 Data payload = make([]byte, header.Length)
frameBuffer.Write(payload) _, err := io.ReadFull(packetReader, payload)
case 0x0002: // Time if err != nil {
// Append the Framebuffer log.Printf("Read Error: %s\n", err)
packetBuffer.Write(frameBuffer.Bytes()) break
}
} else {
payload = []byte{}
}
// Send out the packet switch header.MessageType {
rtpConn.Write(packetBuffer.Bytes()) case 0x0001: // H.264 Data
frameBuffer.Write(payload)
case 0x0002: // Time
// Append the Framebuffer
packetBuffer.Write(frameBuffer.Bytes())
// Prepare the next packet // Send out the packet
packetBuffer.Reset() rtpConn.Write(packetBuffer.Bytes())
packetBuffer.Write([]byte{0x80, 0x63})
binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1)
binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90)
binary.Write(&packetBuffer, binary.BigEndian, (uint64(0)))
// Reset the Framebuffer // Prepare the next packet
frameBuffer.Reset() packetBuffer.Reset()
sequenceNumber++ packetBuffer.Write([]byte{0x80, 0x63})
binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1)
binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90)
binary.Write(&packetBuffer, binary.BigEndian, (uint64(0)))
elapsed = binary.LittleEndian.Uint32(payload[12:]) // Reset the Framebuffer
default: frameBuffer.Reset()
log.Printf("Received Unknown Message: %+v\n", header) sequenceNumber++
log.Printf("Payload:\n%s\n", hex.Dump(payload))
elapsed = binary.LittleEndian.Uint32(payload[12:])
default:
log.Printf("Received Unknown Message: %+v\n", header)
log.Printf("Payload:\n%s\n", hex.Dump(payload))
}
} }
} }
}
} }
// Stop stops listening for packets // Stop stops listening for packets
func (r *RTPRelay) Stop() { func (r *RTPRelay) Stop() {
r.listener.Close() close = true
r.close = true r.close = true
r.listener.Close()
} }