From e82f5a97c99f6d2ae6f9ba9e4e0ed00a908f6b6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20K=C3=B6ritz?= Date: Tue, 15 Oct 2019 12:59:06 +0200 Subject: [PATCH] Fixes #2, allow server goroutines to exit on deadlines and context cancel. --- actioncam.go | 28 +++++++++-- libipcamera/Camera.go | 7 ++- libipcamera/Discovery.go | 2 + libipcamera/RTPRelay.go | 102 ++++++++++++++++++++++----------------- rtsp/RTSPServer.go | 27 +++++++---- 5 files changed, 108 insertions(+), 58 deletions(-) diff --git a/actioncam.go b/actioncam.go index f1d3bff..7a9973b 100644 --- a/actioncam.go +++ b/actioncam.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "context" "encoding/binary" "encoding/hex" "fmt" @@ -10,6 +11,7 @@ import ( "net" "net/http" "os" + "os/signal" "path/filepath" "runtime" "runtime/pprof" @@ -20,10 +22,15 @@ import ( ) func connectAndLogin(ip net.IP, port int, username, password string, verbose bool) *libipcamera.Camera { - camera := libipcamera.CreateCamera(ip, port, username, password) + camera, err := libipcamera.CreateCamera(ip, port, username, password) + if err != nil { + log.Printf("ERROR instantiating camera: %s\n", err) + os.Exit(1) + } camera.SetVerbose(verbose) camera.Connect() camera.Login() + return camera } @@ -39,13 +46,15 @@ func main() { var camera *libipcamera.Camera + var applicationContext context.Context + var rootCmd = &cobra.Command{ Use: "actioncam [Cameras IP Address]", Short: "actioncam is a tool to stream the video preview of cheap action cameras without the mobile application", Args: cobra.MaximumNArgs(1), Run: func(cmd *cobra.Command, args []string) { defer camera.Disconnect() - relay := libipcamera.CreateRTPRelay(net.ParseIP("127.0.0.1"), 5220) + relay := libipcamera.CreateRTPRelay(applicationContext, net.ParseIP("127.0.0.1"), 5220) defer relay.Stop() camera.StartPreviewStream() @@ -53,6 +62,18 @@ func main() { bufio.NewReader(os.Stdin).ReadBytes('\n') }, PersistentPreRun: func(cmd *cobra.Command, args []string) { + signalChannel := make(chan os.Signal) + signal.Notify(signalChannel, os.Interrupt) + var cancel context.CancelFunc + applicationContext, cancel = context.WithCancel(context.Background()) + go func(cancel context.CancelFunc) { + select { + case sig := <-signalChannel: + log.Printf("Got signal %s, exiting...\n", sig) + cancel() + } + }(cancel) + if cpuprofile != "" { cpuprofileFile, err := os.Create(cpuprofile) if err != nil { @@ -232,8 +253,9 @@ func main() { Short: "Start an RTSP-Server serving the cameras preview.", Args: cobra.MaximumNArgs(1), Run: func(cmd *cobra.Command, args []string) { + rtspServer := rtsp.CreateServer(applicationContext, "127.0.0.1", 8554, camera) + defer rtspServer.Stop() - rtspServer := rtsp.CreateServer("127.0.0.1", 8554, camera) log.Printf("Created RTSP Server\n") err := rtspServer.ListenAndServe() diff --git a/libipcamera/Camera.go b/libipcamera/Camera.go index 1ab2496..23711a6 100644 --- a/libipcamera/Camera.go +++ b/libipcamera/Camera.go @@ -61,7 +61,10 @@ type StoredFile struct { } // CreateCamera creates a new Camera instance -func CreateCamera(ipAddress net.IP, port int, username, password string) *Camera { +func CreateCamera(ipAddress net.IP, port int, username, password string) (*Camera, error) { + if ipAddress == nil { + return nil, errors.New("Cannot create camera without an IP-Address") + } camera := &Camera{ ipAddress: ipAddress, port: port, @@ -70,7 +73,7 @@ func CreateCamera(ipAddress net.IP, port int, username, password string) *Camera messageHandlers: make(map[uint32][]MessageHandler, 0), verbose: true, } - return camera + return camera, nil } // Connect to the camera and start responding to keepalive packets diff --git a/libipcamera/Discovery.go b/libipcamera/Discovery.go index de9384d..513f4c9 100644 --- a/libipcamera/Discovery.go +++ b/libipcamera/Discovery.go @@ -13,6 +13,8 @@ var targetPorts = []int{22600, 21600} // AutodiscoverCamera will try to find a camera using UDP Broadcasts func AutodiscoverCamera(verbose bool) (net.IP, error) { conn, err := net.ListenPacket("udp", ":22601") + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + if err != nil { return nil, err } diff --git a/libipcamera/RTPRelay.go b/libipcamera/RTPRelay.go index e510dd7..e63c29d 100644 --- a/libipcamera/RTPRelay.go +++ b/libipcamera/RTPRelay.go @@ -2,11 +2,13 @@ package libipcamera import ( "bytes" + "context" "encoding/binary" "encoding/hex" "io" "log" "net" + "time" ) // RTPRelay holds information on the relaying stream listener @@ -15,11 +17,12 @@ type RTPRelay struct { targetIP net.IP targetPort int listener net.PacketConn + context context.Context } // CreateRTPRelay creates a UDP listener that handles live data // from the camera and forwards it as an RTP stream -func CreateRTPRelay(targetAddress net.IP, targetPort int) *RTPRelay { +func CreateRTPRelay(ctx context.Context, targetAddress net.IP, targetPort int) *RTPRelay { conn, err := net.ListenPacket("udp", ":6669") if err != nil { @@ -30,6 +33,7 @@ func CreateRTPRelay(targetAddress net.IP, targetPort int) *RTPRelay { targetIP: targetAddress, targetPort: targetPort, listener: conn, + context: ctx, } if err != nil { log.Printf("ERROR: %s\n", err) @@ -64,60 +68,70 @@ func handleCameraStream(relay RTPRelay, conn net.PacketConn) { packetBuffer := bytes.Buffer{} for { - if relay.close { + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + + select { + case <-relay.context.Done(): + log.Println("Context Done") rtpConn.Close() + relay.listener.Close() break - } - - conn.ReadFrom(buffer) - packetReader.Reset(buffer) - - binary.Read(packetReader, binary.BigEndian, &header) - - if header.Magic != 0xBCDE { - log.Printf("Received message with invalid magic (%x).", header.Magic) - break - } - - if header.Length > 0 { - payload = make([]byte, header.Length) - _, err := io.ReadFull(packetReader, payload) - if err != nil { - log.Printf("Read Error: %s\n", err) + default: + if relay.close { + rtpConn.Close() + relay.listener.Close() break } - } else { - payload = []byte{} - } - switch header.MessageType { - case 0x0001: // H.264 Data - frameBuffer.Write(payload) - case 0x0002: // Time - // Append the Framebuffer - packetBuffer.Write(frameBuffer.Bytes()) + conn.ReadFrom(buffer) + packetReader.Reset(buffer) - // Send out the packet - rtpConn.Write(packetBuffer.Bytes()) + binary.Read(packetReader, binary.BigEndian, &header) - // Prepare the next packet - packetBuffer.Reset() - packetBuffer.Write([]byte{0x80, 0x63}) - binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1) - binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90) - binary.Write(&packetBuffer, binary.BigEndian, (uint64(0))) + if header.Magic != 0xBCDE { + log.Printf("Received message with invalid magic (%x).", header.Magic) + break + } - // Reset the Framebuffer - frameBuffer.Reset() - sequenceNumber++ + if header.Length > 0 { + payload = make([]byte, header.Length) + _, err := io.ReadFull(packetReader, payload) + if err != nil { + log.Printf("Read Error: %s\n", err) + break + } + } else { + payload = []byte{} + } - elapsed = binary.LittleEndian.Uint32(payload[12:]) - default: - log.Printf("Received Unknown Message: %+v\n", header) - log.Printf("Payload:\n%s\n", hex.Dump(payload)) + switch header.MessageType { + case 0x0001: // H.264 Data + frameBuffer.Write(payload) + case 0x0002: // Time + // Append the Framebuffer + packetBuffer.Write(frameBuffer.Bytes()) + + // Send out the packet + rtpConn.Write(packetBuffer.Bytes()) + + // Prepare the next packet + packetBuffer.Reset() + packetBuffer.Write([]byte{0x80, 0x63}) + binary.Write(&packetBuffer, binary.BigEndian, sequenceNumber+1) + binary.Write(&packetBuffer, binary.BigEndian, (uint32)(elapsed)*90) + binary.Write(&packetBuffer, binary.BigEndian, (uint64(0))) + + // Reset the Framebuffer + frameBuffer.Reset() + sequenceNumber++ + + elapsed = binary.LittleEndian.Uint32(payload[12:]) + default: + log.Printf("Received Unknown Message: %+v\n", header) + log.Printf("Payload:\n%s\n", hex.Dump(payload)) + } } } - rtpConn.Close() } // Stop stops listening for packets diff --git a/rtsp/RTSPServer.go b/rtsp/RTSPServer.go index 212d73e..c5400f4 100644 --- a/rtsp/RTSPServer.go +++ b/rtsp/RTSPServer.go @@ -2,6 +2,7 @@ package rtsp import ( "bufio" + "context" "crypto/md5" "fmt" "log" @@ -22,10 +23,11 @@ type Server struct { rtpRelay *libipcamera.RTPRelay camera *libipcamera.Camera sdp string + context context.Context } // CreateServer creates a new Server instance -func CreateServer(localIP string, port int, camera *libipcamera.Camera) *Server { +func CreateServer(ctx context.Context, localIP string, port int, camera *libipcamera.Camera) *Server { server := &Server{ localIP: localIP, localPort: port, @@ -33,6 +35,7 @@ func CreateServer(localIP string, port int, camera *libipcamera.Camera) *Server remoteRTPPort: 0, remoteIP: "", sdp: "v=0\r\ns=ActionCamera\r\nm=video 0 RTP/AVP 99\r\na=rtpmap:99 H264/90000", + context: ctx, } return server } @@ -49,14 +52,20 @@ func (s *Server) ListenAndServe() error { log.Printf("RTSP Server waiting for connections on %s:%d\n", s.localIP, s.localPort) for { - conn, err := listener.Accept() - if err != nil { - log.Printf("ERROR accepting connection: %s\n", err) + select { + case <-s.context.Done(): + listener.Close() + break + default: + conn, err := listener.Accept() + if err != nil { + log.Printf("ERROR accepting connection: %s\n", err) + } + + log.Printf("Accepted new RTSP Client %s\n", conn.RemoteAddr().String()) + + go s.handleClient(conn) } - - log.Printf("Accepted new RTSP Client %s\n", conn.RemoteAddr().String()) - - go s.handleClient(conn) } } @@ -126,7 +135,7 @@ func (s *Server) handleRequest(packet []string, conn net.Conn) { conn.Write([]byte("\r\n")) case "PLAY": - s.rtpRelay = libipcamera.CreateRTPRelay(net.ParseIP(s.remoteIP), s.remoteRTPPort) + s.rtpRelay = libipcamera.CreateRTPRelay(s.context, net.ParseIP(s.remoteIP), s.remoteRTPPort) s.camera.StartPreviewStream() writeStatus(conn, 200, "OK")