Blame


1 9016a0fb 2023-03-08 o package icinga
2 9016a0fb 2023-03-08 o
3 9016a0fb 2023-03-08 o import (
4 9016a0fb 2023-03-08 o "bufio"
5 9016a0fb 2023-03-08 o "bytes"
6 9016a0fb 2023-03-08 o "encoding/json"
7 9016a0fb 2023-03-08 o "fmt"
8 9016a0fb 2023-03-08 o "net/http"
9 9016a0fb 2023-03-08 o )
10 9016a0fb 2023-03-08 o
11 9016a0fb 2023-03-08 o // An Event represents an event from the Icinga Event Stream.
12 9016a0fb 2023-03-08 o type Event struct {
13 9016a0fb 2023-03-08 o // Type indicates the type of the stream, such as CheckResult.
14 9016a0fb 2023-03-08 o Type string `json:"type"`
15 9016a0fb 2023-03-08 o // Host is the name of an Icinga Host object which this event relates to.
16 9016a0fb 2023-03-08 o Host string `json:"host"`
17 9016a0fb 2023-03-08 o // Service is the name of an Icinga Service object which this event relates to.
18 9016a0fb 2023-03-08 o // It is empty when a CheckResult event of a Host object is received.
19 9016a0fb 2023-03-08 o Service string `json:"service"`
20 9016a0fb 2023-03-08 o Acknowledgement bool `json:"acknowledgement"`
21 9016a0fb 2023-03-08 o CheckResult *CheckResult `json:"check_result"`
22 9016a0fb 2023-03-08 o Error error
23 9016a0fb 2023-03-08 o }
24 9016a0fb 2023-03-08 o
25 9016a0fb 2023-03-08 o // Subscribe returns a channel through which events from the
26 9016a0fb 2023-03-08 o // corresponding Icinga Event Stream named in typ are sent.
27 9016a0fb 2023-03-08 o // Queue is a unique identifier Icinga uses to manage stream clients.
28 9016a0fb 2023-03-08 o // Filter is a filter expression which modifies which events will be received;
29 9016a0fb 2023-03-08 o // the empty string means all events are sent.
30 9016a0fb 2023-03-08 o //
31 9016a0fb 2023-03-08 o // Any errors on initialising the connection are returned immediately as a value.
32 9016a0fb 2023-03-08 o // Subsequent errors reading the stream are set in the Error field of sent Events.
33 9016a0fb 2023-03-08 o // Callers should handle both cases and resubscribe as required.
34 9016a0fb 2023-03-08 o func (c *Client) Subscribe(typ, queue, filter string) (<-chan Event, error) {
35 9016a0fb 2023-03-08 o m := map[string]interface{}{
36 9016a0fb 2023-03-08 o "types": []string{typ},
37 9016a0fb 2023-03-08 o "queue": queue,
38 9016a0fb 2023-03-08 o "filter": filter,
39 9016a0fb 2023-03-08 o }
40 9016a0fb 2023-03-08 o buf := &bytes.Buffer{}
41 9016a0fb 2023-03-08 o if err := json.NewEncoder(buf).Encode(m); err != nil {
42 9016a0fb 2023-03-08 o return nil, fmt.Errorf("encode stream parameters: %w", err)
43 9016a0fb 2023-03-08 o }
44 9016a0fb 2023-03-08 o ch := make(chan Event)
45 9016a0fb 2023-03-08 o resp, err := c.post("/events", buf)
46 9016a0fb 2023-03-08 o if err != nil {
47 9016a0fb 2023-03-08 o return nil, err
48 9016a0fb 2023-03-08 o }
49 9016a0fb 2023-03-08 o if resp.StatusCode != http.StatusOK {
50 9016a0fb 2023-03-08 o iresp, err := parseResponse(resp.Body)
51 9016a0fb 2023-03-08 o if err != nil {
52 9016a0fb 2023-03-08 o return nil, fmt.Errorf("request events: parse error response: %w", err)
53 9016a0fb 2023-03-08 o }
54 9016a0fb 2023-03-08 o return nil, fmt.Errorf("request events: %w", iresp.Error)
55 9016a0fb 2023-03-08 o }
56 9016a0fb 2023-03-08 o sc := bufio.NewScanner(resp.Body)
57 9016a0fb 2023-03-08 o go func() {
58 9016a0fb 2023-03-08 o for sc.Scan() {
59 9016a0fb 2023-03-08 o var ev Event
60 9016a0fb 2023-03-08 o if err := json.Unmarshal(sc.Bytes(), &ev); err != nil {
61 9016a0fb 2023-03-08 o ch <- Event{Error: fmt.Errorf("decode event: %w", err)}
62 9016a0fb 2023-03-08 o continue
63 9016a0fb 2023-03-08 o }
64 9016a0fb 2023-03-08 o ch <- ev
65 9016a0fb 2023-03-08 o }
66 9016a0fb 2023-03-08 o if sc.Err() != nil {
67 9016a0fb 2023-03-08 o ch <- Event{Error: fmt.Errorf("scan response: %w", err)}
68 9016a0fb 2023-03-08 o }
69 9016a0fb 2023-03-08 o resp.Body.Close()
70 9016a0fb 2023-03-08 o close(ch)
71 9016a0fb 2023-03-08 o }()
72 9016a0fb 2023-03-08 o return ch, nil
73 9016a0fb 2023-03-08 o }