Blob


1 package icinga
3 import (
4 "bufio"
5 "bytes"
6 "encoding/json"
7 "fmt"
8 "net/http"
9 )
11 // An Event represents an event from the Icinga Event Stream.
12 type Event struct {
13 // Type indicates the type of the stream, such as CheckResult.
14 Type string `json:"type"`
15 // Host is the name of an Icinga Host object which this event relates to.
16 Host string `json:"host"`
17 // Service is the name of an Icinga Service object which this event relates to.
18 // It is empty when a CheckResult event of a Host object is received.
19 Service string `json:"service"`
20 Acknowledgement bool `json:"acknowledgement"`
21 CheckResult *CheckResult `json:"check_result"`
22 Error error
23 }
25 // Subscribe returns a channel through which events from the
26 // corresponding Icinga Event Stream named in typ are sent.
27 // Queue is a unique identifier Icinga uses to manage stream clients.
28 // Filter is a filter expression which modifies which events will be received;
29 // the empty string means all events are sent.
30 //
31 // Any errors on initialising the connection are returned immediately as a value.
32 // Subsequent errors reading the stream are set in the Error field of sent Events.
33 // Callers should handle both cases and resubscribe as required.
34 func (c *Client) Subscribe(typ, queue, filter string) (<-chan Event, error) {
35 m := map[string]interface{}{
36 "types": []string{typ},
37 "queue": queue,
38 "filter": filter,
39 }
40 buf := &bytes.Buffer{}
41 if err := json.NewEncoder(buf).Encode(m); err != nil {
42 return nil, fmt.Errorf("encode stream parameters: %w", err)
43 }
44 ch := make(chan Event)
45 resp, err := c.post("/events", buf)
46 if err != nil {
47 return nil, err
48 }
49 if resp.StatusCode != http.StatusOK {
50 iresp, err := parseResponse(resp.Body)
51 if err != nil {
52 return nil, fmt.Errorf("request events: parse error response: %w", err)
53 }
54 return nil, fmt.Errorf("request events: %w", iresp.Error)
55 }
56 sc := bufio.NewScanner(resp.Body)
57 go func() {
58 for sc.Scan() {
59 var ev Event
60 if err := json.Unmarshal(sc.Bytes(), &ev); err != nil {
61 ch <- Event{Error: fmt.Errorf("decode event: %w", err)}
62 continue
63 }
64 ch <- ev
65 }
66 if sc.Err() != nil {
67 ch <- Event{Error: fmt.Errorf("scan response: %w", err)}
68 }
69 resp.Body.Close()
70 close(ch)
71 }()
72 return ch, nil
73 }