commit 2f7e40d9e97ce934b511c4468f323f9fbaa27f29 from: Oliver Lowe date: Thu Apr 27 03:19:35 2023 UTC Correctly return scan errors commit - d65c457cdb78bfe473db953cb4cd0f8243ec21be commit + 2f7e40d9e97ce934b511c4468f323f9fbaa27f29 blob - ef52fa875cf1fa025d4b8e2a97d5e55d72a90ac6 blob + 554e4068696e6baff7d643be4d261667e4053285 --- stream.go +++ stream.go @@ -41,7 +41,6 @@ func (c *Client) Subscribe(typ, queue, filter string) if err := json.NewEncoder(buf).Encode(m); err != nil { return nil, fmt.Errorf("encode stream parameters: %w", err) } - ch := make(chan Event) resp, err := c.post("/events", buf) if err != nil { return nil, err @@ -54,17 +53,18 @@ func (c *Client) Subscribe(typ, queue, filter string) return nil, fmt.Errorf("request events: %w", iresp.Error) } sc := bufio.NewScanner(resp.Body) + ch := make(chan Event) go func() { for sc.Scan() { var ev Event if err := json.Unmarshal(sc.Bytes(), &ev); err != nil { - ch <- Event{Error: fmt.Errorf("decode event: %w", err)} + ch <- Event{Error: fmt.Errorf("decode event: %v", err)} continue } ch <- ev } if sc.Err() != nil { - ch <- Event{Error: fmt.Errorf("scan response: %w", err)} + ch <- Event{Error: fmt.Errorf("scan response: %w", sc.Err())} } resp.Body.Close() close(ch)