commit - 43c71de36cec7d072d2436925b93cf2e94b71606
commit + 9016a0fbf4855edbd3463ae2edf332d3a8f44466
blob - /dev/null
blob + ef52fa875cf1fa025d4b8e2a97d5e55d72a90ac6 (mode 644)
--- /dev/null
+++ stream.go
+package icinga
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "net/http"
+)
+
+// An Event represents an event from the Icinga Event Stream.
+type Event struct {
+ // Type indicates the type of the stream, such as CheckResult.
+ Type string `json:"type"`
+ // Host is the name of an Icinga Host object which this event relates to.
+ Host string `json:"host"`
+ // Service is the name of an Icinga Service object which this event relates to.
+ // It is empty when a CheckResult event of a Host object is received.
+ Service string `json:"service"`
+ Acknowledgement bool `json:"acknowledgement"`
+ CheckResult *CheckResult `json:"check_result"`
+ Error error
+}
+
+// Subscribe returns a channel through which events from the
+// corresponding Icinga Event Stream named in typ are sent.
+// Queue is a unique identifier Icinga uses to manage stream clients.
+// Filter is a filter expression which modifies which events will be received;
+// the empty string means all events are sent.
+//
+// Any errors on initialising the connection are returned immediately as a value.
+// Subsequent errors reading the stream are set in the Error field of sent Events.
+// Callers should handle both cases and resubscribe as required.
+func (c *Client) Subscribe(typ, queue, filter string) (<-chan Event, error) {
+ m := map[string]interface{}{
+ "types": []string{typ},
+ "queue": queue,
+ "filter": filter,
+ }
+ buf := &bytes.Buffer{}
+ if err := json.NewEncoder(buf).Encode(m); err != nil {
+ return nil, fmt.Errorf("encode stream parameters: %w", err)
+ }
+ ch := make(chan Event)
+ resp, err := c.post("/events", buf)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode != http.StatusOK {
+ iresp, err := parseResponse(resp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("request events: parse error response: %w", err)
+ }
+ return nil, fmt.Errorf("request events: %w", iresp.Error)
+ }
+ sc := bufio.NewScanner(resp.Body)
+ go func() {
+ for sc.Scan() {
+ var ev Event
+ if err := json.Unmarshal(sc.Bytes(), &ev); err != nil {
+ ch <- Event{Error: fmt.Errorf("decode event: %w", err)}
+ continue
+ }
+ ch <- ev
+ }
+ if sc.Err() != nil {
+ ch <- Event{Error: fmt.Errorf("scan response: %w", err)}
+ }
+ resp.Body.Close()
+ close(ch)
+ }()
+ return ch, nil
+}