Skip to content
This repository was archived by the owner on Jun 12, 2024. It is now read-only.

Commit ed88685

Browse files
authored
Extract the HTTP request task from Hub (#56)
* This also involves extracting the token creator from Hub * Also, moved the retention and SQL jobs to `handlers` package * Fixed a go routine leak in the HTTP task handler's code * Use `time.Duration` for the duration and remove `Hub` from strings * Allow variations of JSON content types * Add test case that proves empty response works
1 parent 40308bb commit ed88685

File tree

11 files changed

+1047
-8
lines changed

11 files changed

+1047
-8
lines changed

pkg/queue/handlers/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package handlers
2+
3+
import "github.com/pkg/errors"
4+
5+
var (
6+
ErrSerializingHearbeat = errors.New("failed to serialize progress payload while sending heartbeat")
7+
)

pkg/queue/handlers/http_request.go

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
package handlers
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"strings"
10+
"time"
11+
12+
"github.com/contiamo/go-base/pkg/queue"
13+
"github.com/contiamo/go-base/pkg/queue/workers"
14+
"github.com/contiamo/go-base/pkg/tokens"
15+
"github.com/contiamo/go-base/pkg/tracing"
16+
"github.com/opentracing/opentracing-go"
17+
otext "github.com/opentracing/opentracing-go/ext"
18+
"github.com/sirupsen/logrus"
19+
)
20+
21+
var (
22+
// APIRequestTask marks a task as an API request task
23+
APIRequestTask queue.TaskType = "api-request"
24+
)
25+
26+
// APIRequestTaskSpec describes the specification of the API request task
27+
type APIRequestTaskSpec struct {
28+
// Method to use for the API request
29+
Method string `json:"method"`
30+
// URL is the target URL for the request.
31+
// Must be an absolute URL that contains the scheme and the host components.
32+
URL string `json:"url"`
33+
// RequestBody to send
34+
RequestBody string `json:"requestBody"`
35+
// RequestHeaders to send
36+
RequestHeaders map[string]string `json:"requestHeaders"`
37+
// Authorized if `true` the task will send a header with the
38+
// signed JWT token as a part of the request
39+
Authorized bool `json:"authorized"`
40+
// ExpectedStatus is an HTTP status expected as a response.
41+
// If it does not match the actual status the task fails
42+
ExpectedStatus int `json:"expectedStatus"`
43+
}
44+
45+
type APIRequestStage string
46+
47+
var (
48+
// RequestPreparing means the task is preparing the request parameters and the body
49+
RequestPreparing APIRequestStage = "preparing"
50+
// RequestPending means the request was sent, awaiting the response
51+
RequestPending APIRequestStage = "pending"
52+
// RequestResponse means the response was received
53+
RequestResponse APIRequestStage = "response"
54+
)
55+
56+
// APIRequestProgress describes the progress of the API request task stored during
57+
// the heartbeat handling
58+
type APIRequestProgress struct {
59+
// Stage is the current stage of the API request task
60+
Stage APIRequestStage `json:"stage,omitempty"`
61+
// Duration of the HTTP request
62+
Duration *time.Duration `json:"duration,omitempty"`
63+
// ReturnedStatus is a status returned from the target endpoint
64+
ReturnedStatus *int `json:"returnedStatus,omitempty"`
65+
// ReturnedBody is a body returned from the target endpoint
66+
ReturnedBody *string `json:"returnedBody,omitempty"`
67+
// ErrorMessage contains an error message string if it occurs during the update process
68+
ErrorMessage *string `json:"errorMessage,omitempty"`
69+
}
70+
71+
// NewAPIRequestHandler creates a task handler that makes an HTTP request to a target API.
72+
// The response from the request must be valid JSON or a stream of new line-separated
73+
// JSON objects, otherwise the task will fail.
74+
func NewAPIRequestHandler(tokenHeaderName string, tokenCreator tokens.Creator, client *http.Client) workers.TaskHandler {
75+
return &apiRequestHandler{
76+
Tracer: tracing.NewTracer("handlers", "APIRequestHandler"),
77+
tokenHeaderName: tokenHeaderName,
78+
tokenCreator: tokenCreator,
79+
client: client,
80+
}
81+
}
82+
83+
type apiRequestHandler struct {
84+
tracing.Tracer
85+
tokenHeaderName string
86+
tokenCreator tokens.Creator
87+
client *http.Client
88+
}
89+
90+
func (h *apiRequestHandler) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error) {
91+
span, ctx := h.StartSpan(ctx, "Process")
92+
defer func() {
93+
close(heartbeats)
94+
h.FinishSpan(span, err)
95+
}()
96+
span.SetTag("task.id", task.ID)
97+
span.SetTag("task.queue", task.Queue)
98+
span.SetTag("task.type", task.Type)
99+
span.SetTag("task.spec", string(task.Spec))
100+
101+
logrus := logrus.WithField("type", task.Type).WithField("id", task.ID)
102+
103+
logrus.Debug("starting the API request task...")
104+
105+
var progress APIRequestProgress
106+
defer func() {
107+
// we check for errSerializingHearbeat so we don't cause
108+
// a recursion call
109+
if err == nil || err == ErrSerializingHearbeat {
110+
return
111+
}
112+
message := err.Error()
113+
progress.ErrorMessage = &message
114+
_ = sendAPIRequestProgress(progress, heartbeats)
115+
}()
116+
117+
var spec APIRequestTaskSpec
118+
err = json.Unmarshal(task.Spec, &spec)
119+
if err != nil {
120+
return err
121+
}
122+
123+
progress.Stage = RequestPreparing
124+
err = sendAPIRequestProgress(progress, heartbeats)
125+
if err != nil {
126+
return err
127+
}
128+
129+
var payload io.Reader
130+
if spec.RequestBody != "" {
131+
payload = strings.NewReader(spec.RequestBody)
132+
}
133+
134+
req, err := http.NewRequest(spec.Method, spec.URL, payload)
135+
if err != nil {
136+
return err
137+
}
138+
139+
req.Header.Add("User-Agent", "Contiamo API Request Task")
140+
141+
for name, value := range spec.RequestHeaders {
142+
req.Header.Add(name, value)
143+
}
144+
145+
if spec.Authorized {
146+
token, err := h.tokenCreator.Create("apiRequestTask")
147+
if err != nil {
148+
return err
149+
}
150+
151+
req.Header.Add(h.tokenHeaderName, token)
152+
}
153+
154+
err = opentracing.GlobalTracer().Inject(
155+
span.Context(),
156+
opentracing.HTTPHeaders,
157+
opentracing.HTTPHeadersCarrier(req.Header),
158+
)
159+
if err != nil {
160+
otext.Error.Set(span, true)
161+
span.SetTag("tracing.inject.err", err.Error())
162+
err = nil
163+
}
164+
165+
progress.Stage = RequestPending
166+
err = sendAPIRequestProgress(progress, heartbeats)
167+
if err != nil {
168+
return err
169+
}
170+
171+
now := time.Now()
172+
defer func() {
173+
duration := time.Since(now)
174+
progress.Duration = &duration
175+
err := sendAPIRequestProgress(progress, heartbeats)
176+
if err != nil {
177+
logrus.Error(err)
178+
}
179+
}()
180+
181+
resp, err := h.client.Do(req.WithContext(ctx))
182+
if err != nil {
183+
return err
184+
}
185+
defer resp.Body.Close()
186+
187+
contentType := resp.Header.Get("content-type")
188+
if !strings.Contains(contentType, "json") {
189+
return fmt.Errorf(
190+
"unexpected response content type, expected JSON, got `%s`",
191+
contentType,
192+
)
193+
}
194+
195+
progress.Stage = RequestResponse
196+
progress.ReturnedStatus = &resp.StatusCode
197+
err = sendAPIRequestProgress(progress, heartbeats)
198+
if err != nil {
199+
return err
200+
}
201+
202+
// the task would time out if the heartbeat was not sent for 30 seconds
203+
ticker := time.NewTicker(10 * time.Second)
204+
defer ticker.Stop()
205+
go func() {
206+
for {
207+
select {
208+
case <-ticker.C:
209+
err := sendAPIRequestProgress(progress, heartbeats)
210+
if err != nil {
211+
logrus.Error(err)
212+
}
213+
case <-ctx.Done():
214+
return
215+
}
216+
}
217+
}()
218+
219+
decoder := json.NewDecoder(resp.Body)
220+
for decoder.More() {
221+
err = ctx.Err()
222+
if err != nil {
223+
return err
224+
}
225+
var m json.RawMessage
226+
err = decoder.Decode(&m)
227+
if err != nil {
228+
return err
229+
}
230+
respString := string(m)
231+
progress.ReturnedBody = &respString
232+
err = sendAPIRequestProgress(progress, heartbeats)
233+
if err != nil {
234+
return err
235+
}
236+
}
237+
238+
if spec.ExpectedStatus != resp.StatusCode {
239+
return fmt.Errorf("expected status %d but got %d", spec.ExpectedStatus, resp.StatusCode)
240+
}
241+
242+
return nil
243+
}
244+
245+
func sendAPIRequestProgress(progress APIRequestProgress, heartbeats chan<- queue.Progress) (err error) {
246+
logrus.
247+
WithField("method", "sendAPIRequestProgress").
248+
Debugf("%+v", progress)
249+
250+
bytes, err := json.Marshal(progress)
251+
if err != nil {
252+
logrus.Error(err)
253+
return ErrSerializingHearbeat
254+
}
255+
256+
heartbeats <- bytes
257+
return nil
258+
}

0 commit comments

Comments
 (0)