commit - 15e217461acc59522f43002bbd5233d88d0ba10f
commit + 74364bb7c33ecf3fdef8c8a361310c2f1992c1bd
blob - /dev/null
blob + 0d898d3243353c876ca3d00a3157ab47277186b9 (mode 644)
--- /dev/null
+++ cmd/hlsserve/hlsserve.go
+// Command hlsserve serves a live HLS stream from MPEG-TS video received over TCP.
+// The options are:
+//
+// -l address
+// Listen for MPEG-TS streams on address, in host:port format. The
+// default is ":9000".
+// -h address
+// Listen for HTTP clients on address, in host:port format. The
+// default is ":8080".
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "path"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/untangledco/streaming/m3u8"
+ "github.com/untangledco/streaming/mpegts"
+)
+
+func init() {
+ log.SetFlags(0)
+ log.SetPrefix("hlsserve: ")
+ var err error
+ cacheDir, err = os.UserCacheDir()
+ if err != nil {
+ log.Fatalln("user cache dir:", err)
+ }
+ cacheDir = filepath.Join(cacheDir, "hlsserve")
+}
+
+// rule of thumb for UDP transport
+const maxTSBytes int = 7 * mpegts.PacketSize
+
+const segmentDuration = 3 * time.Second
+
+var cacheDir string
+var sequence int
+
+func removeOld(dir string, maxAge time.Duration) error {
+ ents, err := os.ReadDir(dir)
+ if err != nil {
+ return err
+ }
+ for _, dent := range ents {
+ if !strings.HasSuffix(dent.Name(), ".ts") {
+ continue
+ }
+ stat, err := dent.Info()
+ if err != nil {
+ return err
+ }
+ if time.Since(stat.ModTime()) > maxAge {
+ if err := os.Remove(filepath.Join(dir, dent.Name())); err != nil {
+ return err
+ }
+ sequence++
+ }
+ }
+ return nil
+}
+
+func writeSegments(dir string, r io.Reader, ch <-chan time.Time) error {
+ var segment int
+ segments := &bytes.Buffer{}
+ sc := mpegts.NewScanner(r)
+ for {
+ select {
+ case <-ch:
+ s := fmt.Sprintf("%04d.ts", segment)
+ fname := path.Join(dir, s)
+ if err := os.WriteFile(fname, segments.Bytes(), 0644); err != nil {
+ return err
+ }
+ segments.Reset()
+ segment++
+ default:
+ if sc.Scan() {
+ if err := mpegts.Encode(segments, sc.Packet()); err != nil {
+ return fmt.Errorf("segment %d: encode packet: %w", segment, err)
+ }
+ }
+ if sc.Err() != nil {
+ return fmt.Errorf("segment %d: scan: %w", segment, sc.Err())
+ }
+ }
+ }
+ fmt.Fprintln(os.Stderr, "shouldn't really be returning here...?")
+ return nil
+}
+
+func makePlaylist(dir string) (*m3u8.Playlist, error) {
+ names, err := filepath.Glob(filepath.Join(dir, "*.ts"))
+ if err != nil {
+ return nil, fmt.Errorf("find segments: %w", err)
+ }
+ playlist := &m3u8.Playlist{
+ Version: 7,
+ TargetDuration: segmentDuration,
+ Sequence: sequence,
+ }
+ for _, name := range names {
+ seg := m3u8.Segment{URI: path.Base(name), Duration: playlist.TargetDuration}
+ playlist.Segments = append(playlist.Segments, seg)
+ }
+ return playlist, nil
+}
+
+const usage string = "usage: hlsserve dir"
+
+func servePlaylist(dir string) http.HandlerFunc {
+ return func(w http.ResponseWriter, req *http.Request) {
+ playlist, err := makePlaylist(dir)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", m3u8.MimeType)
+ if err := m3u8.Encode(w, playlist); err != nil {
+ log.Printf("encode playlist: %v", err)
+ }
+ }
+}
+
+func setCache(seconds int, next http.Handler) http.HandlerFunc {
+ return func(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%d", seconds))
+ next.ServeHTTP(w, req)
+ }
+}
+
+var listen = ":9000"
+
+func init() {
+ flag.StringVar(&listen, "l", ":9000", "listen")
+ flag.Parse()
+}
+
+func main() {
+ if len(flag.Args()) > 1 {
+ fmt.Fprintln(os.Stderr, usage)
+ os.Exit(2)
+ } else if len(flag.Args()) == 1 {
+ cacheDir = flag.Args()[0]
+ }
+ if err := os.MkdirAll(cacheDir, 0755); err != nil {
+ log.Fatal(err)
+ }
+
+ ln, err := net.Listen("tcp", listen)
+ if err != nil {
+ log.Fatal(err)
+ }
+ conn, err := ln.Accept()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ go func() {
+ ticker := time.NewTicker(segmentDuration)
+ if err := writeSegments(cacheDir, conn, ticker.C); err != nil {
+ log.Fatalln("write segments:", err)
+ }
+ }()
+
+ go func() {
+ ticker := time.NewTicker(segmentDuration)
+ for {
+ select {
+ case <-ticker.C:
+ if err := removeOld(cacheDir, 3*8*segmentDuration); err != nil {
+ log.Println("remove old segments:", err)
+ }
+ }
+ }
+ }()
+
+ http.Handle("/playlist.m3u8", servePlaylist(cacheDir))
+ fsys := http.FileServer(http.FS(os.DirFS(cacheDir)))
+ http.Handle("/", setCache(60, fsys))
+ log.Fatal(http.ListenAndServe(":8000", nil))
+}
blob - 0d898d3243353c876ca3d00a3157ab47277186b9 (mode 644)
blob + /dev/null
--- m3u8/cmd/hlsserve/hlsserve.go
+++ /dev/null
-// Command hlsserve serves a live HLS stream from MPEG-TS video received over TCP.
-// The options are:
-//
-// -l address
-// Listen for MPEG-TS streams on address, in host:port format. The
-// default is ":9000".
-// -h address
-// Listen for HTTP clients on address, in host:port format. The
-// default is ":8080".
-package main
-
-import (
- "bytes"
- "flag"
- "fmt"
- "io"
- "log"
- "net"
- "net/http"
- "os"
- "path"
- "path/filepath"
- "strings"
- "time"
-
- "github.com/untangledco/streaming/m3u8"
- "github.com/untangledco/streaming/mpegts"
-)
-
-func init() {
- log.SetFlags(0)
- log.SetPrefix("hlsserve: ")
- var err error
- cacheDir, err = os.UserCacheDir()
- if err != nil {
- log.Fatalln("user cache dir:", err)
- }
- cacheDir = filepath.Join(cacheDir, "hlsserve")
-}
-
-// rule of thumb for UDP transport
-const maxTSBytes int = 7 * mpegts.PacketSize
-
-const segmentDuration = 3 * time.Second
-
-var cacheDir string
-var sequence int
-
-func removeOld(dir string, maxAge time.Duration) error {
- ents, err := os.ReadDir(dir)
- if err != nil {
- return err
- }
- for _, dent := range ents {
- if !strings.HasSuffix(dent.Name(), ".ts") {
- continue
- }
- stat, err := dent.Info()
- if err != nil {
- return err
- }
- if time.Since(stat.ModTime()) > maxAge {
- if err := os.Remove(filepath.Join(dir, dent.Name())); err != nil {
- return err
- }
- sequence++
- }
- }
- return nil
-}
-
-func writeSegments(dir string, r io.Reader, ch <-chan time.Time) error {
- var segment int
- segments := &bytes.Buffer{}
- sc := mpegts.NewScanner(r)
- for {
- select {
- case <-ch:
- s := fmt.Sprintf("%04d.ts", segment)
- fname := path.Join(dir, s)
- if err := os.WriteFile(fname, segments.Bytes(), 0644); err != nil {
- return err
- }
- segments.Reset()
- segment++
- default:
- if sc.Scan() {
- if err := mpegts.Encode(segments, sc.Packet()); err != nil {
- return fmt.Errorf("segment %d: encode packet: %w", segment, err)
- }
- }
- if sc.Err() != nil {
- return fmt.Errorf("segment %d: scan: %w", segment, sc.Err())
- }
- }
- }
- fmt.Fprintln(os.Stderr, "shouldn't really be returning here...?")
- return nil
-}
-
-func makePlaylist(dir string) (*m3u8.Playlist, error) {
- names, err := filepath.Glob(filepath.Join(dir, "*.ts"))
- if err != nil {
- return nil, fmt.Errorf("find segments: %w", err)
- }
- playlist := &m3u8.Playlist{
- Version: 7,
- TargetDuration: segmentDuration,
- Sequence: sequence,
- }
- for _, name := range names {
- seg := m3u8.Segment{URI: path.Base(name), Duration: playlist.TargetDuration}
- playlist.Segments = append(playlist.Segments, seg)
- }
- return playlist, nil
-}
-
-const usage string = "usage: hlsserve dir"
-
-func servePlaylist(dir string) http.HandlerFunc {
- return func(w http.ResponseWriter, req *http.Request) {
- playlist, err := makePlaylist(dir)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", m3u8.MimeType)
- if err := m3u8.Encode(w, playlist); err != nil {
- log.Printf("encode playlist: %v", err)
- }
- }
-}
-
-func setCache(seconds int, next http.Handler) http.HandlerFunc {
- return func(w http.ResponseWriter, req *http.Request) {
- w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%d", seconds))
- next.ServeHTTP(w, req)
- }
-}
-
-var listen = ":9000"
-
-func init() {
- flag.StringVar(&listen, "l", ":9000", "listen")
- flag.Parse()
-}
-
-func main() {
- if len(flag.Args()) > 1 {
- fmt.Fprintln(os.Stderr, usage)
- os.Exit(2)
- } else if len(flag.Args()) == 1 {
- cacheDir = flag.Args()[0]
- }
- if err := os.MkdirAll(cacheDir, 0755); err != nil {
- log.Fatal(err)
- }
-
- ln, err := net.Listen("tcp", listen)
- if err != nil {
- log.Fatal(err)
- }
- conn, err := ln.Accept()
- if err != nil {
- log.Fatal(err)
- }
-
- go func() {
- ticker := time.NewTicker(segmentDuration)
- if err := writeSegments(cacheDir, conn, ticker.C); err != nil {
- log.Fatalln("write segments:", err)
- }
- }()
-
- go func() {
- ticker := time.NewTicker(segmentDuration)
- for {
- select {
- case <-ticker.C:
- if err := removeOld(cacheDir, 3*8*segmentDuration); err != nil {
- log.Println("remove old segments:", err)
- }
- }
- }
- }()
-
- http.Handle("/playlist.m3u8", servePlaylist(cacheDir))
- fsys := http.FileServer(http.FS(os.DirFS(cacheDir)))
- http.Handle("/", setCache(60, fsys))
- log.Fatal(http.ListenAndServe(":8000", nil))
-}