From 914650df420d5d74e1d36c4fee84f7249fb0f8a9 Mon Sep 17 00:00:00 2001 From: David Stotijn Date: Sun, 27 Sep 2020 14:58:37 +0200 Subject: [PATCH] Add polling for req logs, store them async, fix sort order --- admin/src/components/reqlog/LogsOverview.tsx | 4 +- pkg/db/cayley/cayley.go | 39 ++++++++++++++-- pkg/reqlog/reqlog.go | 49 +++++++++++--------- 3 files changed, 63 insertions(+), 29 deletions(-) diff --git a/admin/src/components/reqlog/LogsOverview.tsx b/admin/src/components/reqlog/LogsOverview.tsx index b71ca1b..e7907dd 100644 --- a/admin/src/components/reqlog/LogsOverview.tsx +++ b/admin/src/components/reqlog/LogsOverview.tsx @@ -28,7 +28,9 @@ function LogsOverview(): JSX.Element { const detailReqLogId = router.query.id as string; console.log(detailReqLogId); - const { loading, error, data } = useQuery(HTTP_REQUEST_LOGS); + const { loading, error, data } = useQuery(HTTP_REQUEST_LOGS, { + pollInterval: 1000, + }); const handleLogClick = (reqId: string) => { router.push("/proxy/logs?id=" + reqId, undefined, { diff --git a/pkg/db/cayley/cayley.go b/pkg/db/cayley/cayley.go index 9edb77b..c5e44d0 100644 --- a/pkg/db/cayley/cayley.go +++ b/pkg/db/cayley/cayley.go @@ -8,6 +8,7 @@ import ( "net/url" "path" "strings" + "sync" "time" "github.com/cayleygraph/cayley" @@ -54,6 +55,7 @@ type HTTPHeader struct { type Database struct { store *cayley.Handle schema *schema.Config + mu sync.Mutex } func init() { @@ -106,11 +108,14 @@ func (db *Database) Close() error { } func (db *Database) FindAllRequestLogs(ctx context.Context) ([]reqlog.Request, error) { + db.mu.Lock() + defer db.mu.Unlock() + var reqLogs []reqlog.Request var reqs []HTTPRequest path := cayley.StartPath(db.store, quad.IRI("hy:HTTPRequest")).In(quad.IRI(rdf.Type)) - err := path.Iterate(ctx).EachValue(nil, func(v quad.Value) { + err := path.Iterate(ctx).EachValue(db.store, func(v quad.Value) { var req HTTPRequest if err := db.schema.LoadToDepth(ctx, db.store, &req, -1, v); err != nil { log.Printf("[ERROR] Could not load sub-graph for http requests: %v", err) @@ -130,10 +135,20 @@ func (db *Database) FindAllRequestLogs(ctx context.Context) ([]reqlog.Request, e reqLogs = append(reqLogs, reqLog) } + // By default, all retrieved requests are ordered chronologically, oldest first. + // Reverse the order, so newest logs are first. + for i := len(reqLogs)/2 - 1; i >= 0; i-- { + opp := len(reqLogs) - 1 - i + reqLogs[i], reqLogs[opp] = reqLogs[opp], reqLogs[i] + } + return reqLogs, nil } func (db *Database) FindRequestLogByID(ctx context.Context, id uuid.UUID) (reqlog.Request, error) { + db.mu.Lock() + defer db.mu.Unlock() + var req HTTPRequest err := db.schema.LoadTo(ctx, db.store, &req, iriFromUUID(id)) if schema.IsNotFound(err) { @@ -152,6 +167,9 @@ func (db *Database) FindRequestLogByID(ctx context.Context, id uuid.UUID) (reqlo } func (db *Database) AddRequestLog(ctx context.Context, reqLog reqlog.Request) error { + db.mu.Lock() + defer db.mu.Unlock() + httpReq := HTTPRequest{ ID: iriFromUUID(reqLog.ID), Proto: reqLog.Request.Proto, @@ -162,18 +180,25 @@ func (db *Database) AddRequestLog(ctx context.Context, reqLog reqlog.Request) er Timestamp: reqLog.Timestamp, } - qw := graph.NewWriter(db.store) - defer qw.Close() + tx := cayley.NewTransaction() + qw := graph.NewTxWriter(tx, graph.Add) _, err := db.schema.WriteAsQuads(qw, httpReq) if err != nil { return fmt.Errorf("cayley: could not write quads: %v", err) } + if err := db.store.ApplyTransaction(tx); err != nil { + return fmt.Errorf("cayley: could not apply transaction: %v", err) + } + return nil } func (db *Database) AddResponseLog(ctx context.Context, resLog reqlog.Response) error { + db.mu.Lock() + defer db.mu.Unlock() + httpRes := HTTPResponse{ RequestID: iriFromUUID(resLog.RequestID), Proto: resLog.Response.Proto, @@ -184,14 +209,18 @@ func (db *Database) AddResponseLog(ctx context.Context, resLog reqlog.Response) Timestamp: resLog.Timestamp, } - qw := graph.NewWriter(db.store) - defer qw.Close() + tx := cayley.NewTransaction() + qw := graph.NewTxWriter(tx, graph.Add) _, err := db.schema.WriteAsQuads(qw, httpRes) if err != nil { return fmt.Errorf("cayley: could not write response quads: %v", err) } + if err := db.store.ApplyTransaction(tx); err != nil { + return fmt.Errorf("cayley: could not apply transaction: %v", err) + } + return nil } diff --git a/pkg/reqlog/reqlog.go b/pkg/reqlog/reqlog.go index 4c003b6..b5d3747 100644 --- a/pkg/reqlog/reqlog.go +++ b/pkg/reqlog/reqlog.go @@ -60,6 +60,18 @@ func (svc *Service) addRequest(ctx context.Context, reqID uuid.UUID, req http.Re } func (svc *Service) addResponse(ctx context.Context, reqID uuid.UUID, res http.Response, body []byte) error { + if res.Header.Get("Content-Encoding") == "gzip" { + gzipReader, err := gzip.NewReader(bytes.NewBuffer(body)) + if err != nil { + return fmt.Errorf("reqlog: could not create gzip reader: %v", err) + } + defer gzipReader.Close() + body, err = ioutil.ReadAll(gzipReader) + if err != nil { + return fmt.Errorf("reqlog: could not read gzipped response body: %v", err) + } + } + resLog := Response{ RequestID: reqID, Response: res, @@ -93,10 +105,11 @@ func (svc *Service) RequestModifier(next proxy.RequestModifyFunc) proxy.RequestM return } - err := svc.addRequest(req.Context(), reqID, *clone, body) - if err != nil { - log.Printf("[ERROR] Could not store request log: %v", err) - } + go func() { + if err := svc.addRequest(context.Background(), reqID, *clone, body); err != nil { + log.Printf("[ERROR] Could not store request log: %v", err) + } + }() } } @@ -106,6 +119,11 @@ func (svc *Service) ResponseModifier(next proxy.ResponseModifyFunc) proxy.Respon return err } + reqID, _ := res.Request.Context().Value(proxy.ReqIDKey).(uuid.UUID) + if reqID == uuid.Nil { + return errors.New("reqlog: request is missing ID") + } + clone := *res // TODO: Use io.LimitReader. @@ -115,26 +133,11 @@ func (svc *Service) ResponseModifier(next proxy.ResponseModifyFunc) proxy.Respon } res.Body = ioutil.NopCloser(bytes.NewBuffer(body)) - if res.Header.Get("Content-Encoding") == "gzip" { - gzipReader, err := gzip.NewReader(bytes.NewBuffer(body)) - if err != nil { - return fmt.Errorf("reqlog: could not create gzip reader: %v", err) + go func() { + if err := svc.addResponse(res.Request.Context(), reqID, clone, body); err != nil { + log.Printf("[ERROR] Could not store response log: %v", err) } - defer gzipReader.Close() - body, err = ioutil.ReadAll(gzipReader) - if err != nil { - return fmt.Errorf("reqlog: could not read gzipped response body: %v", err) - } - } - - reqID, _ := res.Request.Context().Value(proxy.ReqIDKey).(uuid.UUID) - if reqID == uuid.Nil { - return errors.New("reqlog: request is missing ID") - } - - if err := svc.addResponse(res.Request.Context(), reqID, clone, body); err != nil { - return fmt.Errorf("reqlog: could not add response: %v", err) - } + }() return nil }