Commit Diff


commit - d65c457cdb78bfe473db953cb4cd0f8243ec21be
commit + 2f7e40d9e97ce934b511c4468f323f9fbaa27f29
blob - ef52fa875cf1fa025d4b8e2a97d5e55d72a90ac6
blob + 554e4068696e6baff7d643be4d261667e4053285
--- stream.go
+++ stream.go
@@ -41,7 +41,6 @@ func (c *Client) Subscribe(typ, queue, filter string) 
 	if err := json.NewEncoder(buf).Encode(m); err != nil {
 		return nil, fmt.Errorf("encode stream parameters: %w", err)
 	}
-	ch := make(chan Event)
 	resp, err := c.post("/events", buf)
 	if err != nil {
 		return nil, err
@@ -54,17 +53,18 @@ func (c *Client) Subscribe(typ, queue, filter string) 
 		return nil, fmt.Errorf("request events: %w", iresp.Error)
 	}
 	sc := bufio.NewScanner(resp.Body)
+	ch := make(chan Event)
 	go func() {
 		for sc.Scan() {
 			var ev Event
 			if err := json.Unmarshal(sc.Bytes(), &ev); err != nil {
-				ch <- Event{Error: fmt.Errorf("decode event: %w", err)}
+				ch <- Event{Error: fmt.Errorf("decode event: %v", err)}
 				continue
 			}
 			ch <- ev
 		}
 		if sc.Err() != nil {
-			ch <- Event{Error: fmt.Errorf("scan response: %w", err)}
+			ch <- Event{Error: fmt.Errorf("scan response: %w", sc.Err())}
 		}
 		resp.Body.Close()
 		close(ch)