Fixes #2, allow server goroutines to exit on deadlines and context cancel.

This commit is contained in:
Jonas Köritz 2019-10-15 12:59:06 +02:00
parent 03ca0f77e6
commit e82f5a97c9
5 changed files with 108 additions and 58 deletions

View file

@ -2,6 +2,7 @@ package main
import ( import (
"bufio" "bufio"
"context"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
@ -10,6 +11,7 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"os/signal"
"path/filepath" "path/filepath"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
@ -20,10 +22,15 @@ import (
) )
func connectAndLogin(ip net.IP, port int, username, password string, verbose bool) *libipcamera.Camera { func connectAndLogin(ip net.IP, port int, username, password string, verbose bool) *libipcamera.Camera {
camera := libipcamera.CreateCamera(ip, port, username, password) camera, err := libipcamera.CreateCamera(ip, port, username, password)
if err != nil {
log.Printf("ERROR instantiating camera: %s\n", err)
os.Exit(1)
}
camera.SetVerbose(verbose) camera.SetVerbose(verbose)
camera.Connect() camera.Connect()
camera.Login() camera.Login()
return camera return camera
} }
@ -39,13 +46,15 @@ func main() {
var camera *libipcamera.Camera var camera *libipcamera.Camera
var applicationContext context.Context
var rootCmd = &cobra.Command{ var rootCmd = &cobra.Command{
Use: "actioncam [Cameras IP Address]", Use: "actioncam [Cameras IP Address]",
Short: "actioncam is a tool to stream the video preview of cheap action cameras without the mobile application", Short: "actioncam is a tool to stream the video preview of cheap action cameras without the mobile application",
Args: cobra.MaximumNArgs(1), Args: cobra.MaximumNArgs(1),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
defer camera.Disconnect() defer camera.Disconnect()
relay := libipcamera.CreateRTPRelay(net.ParseIP("127.0.0.1"), 5220) relay := libipcamera.CreateRTPRelay(applicationContext, net.ParseIP("127.0.0.1"), 5220)
defer relay.Stop() defer relay.Stop()
camera.StartPreviewStream() camera.StartPreviewStream()
@ -53,6 +62,18 @@ func main() {
bufio.NewReader(os.Stdin).ReadBytes('\n') bufio.NewReader(os.Stdin).ReadBytes('\n')
}, },
PersistentPreRun: func(cmd *cobra.Command, args []string) { PersistentPreRun: func(cmd *cobra.Command, args []string) {
signalChannel := make(chan os.Signal)
signal.Notify(signalChannel, os.Interrupt)
var cancel context.CancelFunc
applicationContext, cancel = context.WithCancel(context.Background())
go func(cancel context.CancelFunc) {
select {
case sig := <-signalChannel:
log.Printf("Got signal %s, exiting...\n", sig)
cancel()
}
}(cancel)
if cpuprofile != "" { if cpuprofile != "" {
cpuprofileFile, err := os.Create(cpuprofile) cpuprofileFile, err := os.Create(cpuprofile)
if err != nil { if err != nil {
@ -232,8 +253,9 @@ func main() {
Short: "Start an RTSP-Server serving the cameras preview.", Short: "Start an RTSP-Server serving the cameras preview.",
Args: cobra.MaximumNArgs(1), Args: cobra.MaximumNArgs(1),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
rtspServer := rtsp.CreateServer(applicationContext, "127.0.0.1", 8554, camera)
defer rtspServer.Stop()
rtspServer := rtsp.CreateServer("127.0.0.1", 8554, camera)
log.Printf("Created RTSP Server\n") log.Printf("Created RTSP Server\n")
err := rtspServer.ListenAndServe() err := rtspServer.ListenAndServe()

View file

@ -61,7 +61,10 @@ type StoredFile struct {
} }
// CreateCamera creates a new Camera instance // CreateCamera creates a new Camera instance
func CreateCamera(ipAddress net.IP, port int, username, password string) *Camera { func CreateCamera(ipAddress net.IP, port int, username, password string) (*Camera, error) {
if ipAddress == nil {
return nil, errors.New("Cannot create camera without an IP-Address")
}
camera := &Camera{ camera := &Camera{
ipAddress: ipAddress, ipAddress: ipAddress,
port: port, port: port,
@ -70,7 +73,7 @@ func CreateCamera(ipAddress net.IP, port int, username, password string) *Camera
messageHandlers: make(map[uint32][]MessageHandler, 0), messageHandlers: make(map[uint32][]MessageHandler, 0),
verbose: true, verbose: true,
} }
return camera return camera, nil
} }
// Connect to the camera and start responding to keepalive packets // Connect to the camera and start responding to keepalive packets

View file

@ -13,6 +13,8 @@ var targetPorts = []int{22600, 21600}
// AutodiscoverCamera will try to find a camera using UDP Broadcasts // AutodiscoverCamera will try to find a camera using UDP Broadcasts
func AutodiscoverCamera(verbose bool) (net.IP, error) { func AutodiscoverCamera(verbose bool) (net.IP, error) {
conn, err := net.ListenPacket("udp", ":22601") conn, err := net.ListenPacket("udp", ":22601")
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -2,11 +2,13 @@ package libipcamera
import ( import (
"bytes" "bytes"
"context"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"io" "io"
"log" "log"
"net" "net"
"time"
) )
// RTPRelay holds information on the relaying stream listener // RTPRelay holds information on the relaying stream listener
@ -15,11 +17,12 @@ type RTPRelay struct {
targetIP net.IP targetIP net.IP
targetPort int targetPort int
listener net.PacketConn listener net.PacketConn
context context.Context
} }
// CreateRTPRelay creates a UDP listener that handles live data // CreateRTPRelay creates a UDP listener that handles live data
// from the camera and forwards it as an RTP stream // from the camera and forwards it as an RTP stream
func CreateRTPRelay(targetAddress net.IP, targetPort int) *RTPRelay { func CreateRTPRelay(ctx context.Context, targetAddress net.IP, targetPort int) *RTPRelay {
conn, err := net.ListenPacket("udp", ":6669") conn, err := net.ListenPacket("udp", ":6669")
if err != nil { if err != nil {
@ -30,6 +33,7 @@ func CreateRTPRelay(targetAddress net.IP, targetPort int) *RTPRelay {
targetIP: targetAddress, targetIP: targetAddress,
targetPort: targetPort, targetPort: targetPort,
listener: conn, listener: conn,
context: ctx,
} }
if err != nil { if err != nil {
log.Printf("ERROR: %s\n", err) log.Printf("ERROR: %s\n", err)
@ -64,60 +68,70 @@ func handleCameraStream(relay RTPRelay, conn net.PacketConn) {
packetBuffer := bytes.Buffer{} packetBuffer := bytes.Buffer{}
for { for {
if relay.close { conn.SetReadDeadline(time.Now().Add(10 * time.Second))
select {
case <-relay.context.Done():
log.Println("Context Done")
rtpConn.Close() rtpConn.Close()
relay.listener.Close()
break break
} default:
if relay.close {
conn.ReadFrom(buffer) rtpConn.Close()
packetReader.Reset(buffer) relay.listener.Close()
binary.Read(packetReader, binary.BigEndian, &header)
if header.Magic != 0xBCDE {
log.Printf("Received message with invalid magic (%x).", header.Magic)
break
}
if header.Length > 0 {
payload = make([]byte, header.Length)
_, err := io.ReadFull(packetReader, payload)
if err != nil {
log.Printf("Read Error: %s\n", err)
break break
} }
} else {
payload = []byte{}
}
switch header.MessageType { conn.ReadFrom(buffer)
case 0x0001: // H.264 Data packetReader.Reset(buffer)
frameBuffer.Write(payload)
case 0x0002: // Time
// Append the Framebuffer
packetBuffer.Write(frameBuffer.Bytes())
// Send out the packet binary.Read(packetReader, binary.BigEndian, &header)
rtpConn.Write(packetBuffer.Bytes())
// Prepare the next packet if header.Magic != 0xBCDE {
packetBuffer.Reset() log.Printf("Received message with invalid magic (%x).", header.Magic)
packetBuffer.Write([]byte{0x80, 0x63}) break
binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1) }
binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90)
binary.Write(&packetBuffer, binary.BigEndian, (uint64(0)))
// Reset the Framebuffer if header.Length > 0 {
frameBuffer.Reset() payload = make([]byte, header.Length)
sequenceNumber++ _, err := io.ReadFull(packetReader, payload)
if err != nil {
log.Printf("Read Error: %s\n", err)
break
}
} else {
payload = []byte{}
}
elapsed = binary.LittleEndian.Uint32(payload[12:]) switch header.MessageType {
default: case 0x0001: // H.264 Data
log.Printf("Received Unknown Message: %+v\n", header) frameBuffer.Write(payload)
log.Printf("Payload:\n%s\n", hex.Dump(payload)) case 0x0002: // Time
// Append the Framebuffer
packetBuffer.Write(frameBuffer.Bytes())
// Send out the packet
rtpConn.Write(packetBuffer.Bytes())
// Prepare the next packet
packetBuffer.Reset()
packetBuffer.Write([]byte{0x80, 0x63})
binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1)
binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90)
binary.Write(&packetBuffer, binary.BigEndian, (uint64(0)))
// Reset the Framebuffer
frameBuffer.Reset()
sequenceNumber++
elapsed = binary.LittleEndian.Uint32(payload[12:])
default:
log.Printf("Received Unknown Message: %+v\n", header)
log.Printf("Payload:\n%s\n", hex.Dump(payload))
}
} }
} }
rtpConn.Close()
} }
// Stop stops listening for packets // Stop stops listening for packets

View file

@ -2,6 +2,7 @@ package rtsp
import ( import (
"bufio" "bufio"
"context"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"log" "log"
@ -22,10 +23,11 @@ type Server struct {
rtpRelay *libipcamera.RTPRelay rtpRelay *libipcamera.RTPRelay
camera *libipcamera.Camera camera *libipcamera.Camera
sdp string sdp string
context context.Context
} }
// CreateServer creates a new Server instance // CreateServer creates a new Server instance
func CreateServer(localIP string, port int, camera *libipcamera.Camera) *Server { func CreateServer(ctx context.Context, localIP string, port int, camera *libipcamera.Camera) *Server {
server := &Server{ server := &Server{
localIP: localIP, localIP: localIP,
localPort: port, localPort: port,
@ -33,6 +35,7 @@ func CreateServer(localIP string, port int, camera *libipcamera.Camera) *Server
remoteRTPPort: 0, remoteRTPPort: 0,
remoteIP: "", remoteIP: "",
sdp: "v=0\r\ns=ActionCamera\r\nm=video 0 RTP/AVP 99\r\na=rtpmap:99 H264/90000", sdp: "v=0\r\ns=ActionCamera\r\nm=video 0 RTP/AVP 99\r\na=rtpmap:99 H264/90000",
context: ctx,
} }
return server return server
} }
@ -49,14 +52,20 @@ func (s *Server) ListenAndServe() error {
log.Printf("RTSP Server waiting for connections on %s:%d\n", s.localIP, s.localPort) log.Printf("RTSP Server waiting for connections on %s:%d\n", s.localIP, s.localPort)
for { for {
conn, err := listener.Accept() select {
if err != nil { case <-s.context.Done():
log.Printf("ERROR accepting connection: %s\n", err) listener.Close()
break
default:
conn, err := listener.Accept()
if err != nil {
log.Printf("ERROR accepting connection: %s\n", err)
}
log.Printf("Accepted new RTSP Client %s\n", conn.RemoteAddr().String())
go s.handleClient(conn)
} }
log.Printf("Accepted new RTSP Client %s\n", conn.RemoteAddr().String())
go s.handleClient(conn)
} }
} }
@ -126,7 +135,7 @@ func (s *Server) handleRequest(packet []string, conn net.Conn) {
conn.Write([]byte("\r\n")) conn.Write([]byte("\r\n"))
case "PLAY": case "PLAY":
s.rtpRelay = libipcamera.CreateRTPRelay(net.ParseIP(s.remoteIP), s.remoteRTPPort) s.rtpRelay = libipcamera.CreateRTPRelay(s.context, net.ParseIP(s.remoteIP), s.remoteRTPPort)
s.camera.StartPreviewStream() s.camera.StartPreviewStream()
writeStatus(conn, 200, "OK") writeStatus(conn, 200, "OK")