mirror of
https://github.com/dstotijn/hetty.git
synced 2025-07-01 18:47:29 -04:00
Replace GraphQL server with Connect RPC
This commit is contained in:
@ -5,21 +5,19 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid"
|
||||
connect "connectrpc.com/connect"
|
||||
"github.com/oklog/ulid/v2"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/dstotijn/hetty/pkg/filter"
|
||||
httppb "github.com/dstotijn/hetty/pkg/http"
|
||||
"github.com/dstotijn/hetty/pkg/reqlog"
|
||||
"github.com/dstotijn/hetty/pkg/scope"
|
||||
)
|
||||
|
||||
//nolint:gosec
|
||||
var ulidEntropy = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
var defaultHTTPClient = &http.Client{
|
||||
Transport: &HTTPTransport{},
|
||||
Timeout: 30 * time.Second,
|
||||
@ -31,20 +29,14 @@ var (
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
activeProjectID ulid.ULID
|
||||
findReqsFilter FindRequestsFilter
|
||||
activeProjectID string
|
||||
reqsFilter *RequestsFilter
|
||||
scope *scope.Scope
|
||||
repo Repository
|
||||
reqLogSvc *reqlog.Service
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
type FindRequestsFilter struct {
|
||||
ProjectID ulid.ULID
|
||||
OnlyInScope bool
|
||||
SearchExpr filter.Expression
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Scope *scope.Scope
|
||||
Repository Repository
|
||||
@ -71,165 +63,215 @@ func NewService(cfg Config) *Service {
|
||||
return svc
|
||||
}
|
||||
|
||||
type Request struct {
|
||||
ID ulid.ULID
|
||||
ProjectID ulid.ULID
|
||||
SourceRequestLogID ulid.ULID
|
||||
func (svc *Service) GetRequestByID(ctx context.Context, req *connect.Request[GetRequestByIDRequest]) (*connect.Response[GetRequestByIDResponse], error) {
|
||||
if svc.activeProjectID == "" {
|
||||
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrProjectIDMustBeSet)
|
||||
}
|
||||
|
||||
URL *url.URL
|
||||
Method string
|
||||
Proto string
|
||||
Header http.Header
|
||||
Body []byte
|
||||
|
||||
Response *reqlog.ResponseLog
|
||||
}
|
||||
|
||||
func (svc *Service) FindRequestByID(ctx context.Context, id ulid.ULID) (Request, error) {
|
||||
req, err := svc.repo.FindSenderRequestByID(ctx, svc.activeProjectID, id)
|
||||
senderReq, err := svc.repo.FindSenderRequestByID(ctx, svc.activeProjectID, req.Msg.RequestId)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: failed to find request: %w", err)
|
||||
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("sender: failed to find request: %w", err))
|
||||
}
|
||||
|
||||
return req, nil
|
||||
return &connect.Response[GetRequestByIDResponse]{
|
||||
Msg: &GetRequestByIDResponse{Request: senderReq},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (svc *Service) FindRequests(ctx context.Context) ([]Request, error) {
|
||||
return svc.repo.FindSenderRequests(ctx, svc.findReqsFilter, svc.scope)
|
||||
}
|
||||
|
||||
func (svc *Service) CreateOrUpdateRequest(ctx context.Context, req Request) (Request, error) {
|
||||
if svc.activeProjectID.Compare(ulid.ULID{}) == 0 {
|
||||
return Request{}, ErrProjectIDMustBeSet
|
||||
}
|
||||
|
||||
if req.ID.Compare(ulid.ULID{}) == 0 {
|
||||
req.ID = ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy)
|
||||
}
|
||||
|
||||
req.ProjectID = svc.activeProjectID
|
||||
|
||||
if req.Method == "" {
|
||||
req.Method = http.MethodGet
|
||||
}
|
||||
|
||||
if req.Proto == "" {
|
||||
req.Proto = HTTPProto20
|
||||
}
|
||||
|
||||
if !isValidProto(req.Proto) {
|
||||
return Request{}, fmt.Errorf("sender: unsupported HTTP protocol: %v", req.Proto)
|
||||
}
|
||||
|
||||
err := svc.repo.StoreSenderRequest(ctx, req)
|
||||
func (svc *Service) ListRequests(ctx context.Context, req *connect.Request[ListRequestsRequest]) (*connect.Response[ListRequestsResponse], error) {
|
||||
reqs, err := svc.repo.FindSenderRequests(ctx, svc.activeProjectID, svc.filterRequest)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: failed to store request: %w", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: failed to find requests: %w", err))
|
||||
}
|
||||
|
||||
return req, nil
|
||||
return &connect.Response[ListRequestsResponse]{
|
||||
Msg: &ListRequestsResponse{Requests: reqs},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (svc *Service) CloneFromRequestLog(ctx context.Context, reqLogID ulid.ULID) (Request, error) {
|
||||
if svc.activeProjectID.Compare(ulid.ULID{}) == 0 {
|
||||
return Request{}, ErrProjectIDMustBeSet
|
||||
func (svc *Service) filterRequest(req *Request) (bool, error) {
|
||||
if svc.reqsFilter.OnlyInScope {
|
||||
if svc.scope != nil && !req.MatchScope(svc.scope) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
reqLog, err := svc.reqLogSvc.FindRequestLogByID(ctx, reqLogID)
|
||||
if svc.reqsFilter.SearchExpr == "" {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
expr, err := filter.ParseQuery(svc.reqsFilter.SearchExpr)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: failed to find request log: %w", err)
|
||||
return false, fmt.Errorf("failed to parse search expression: %w", err)
|
||||
}
|
||||
|
||||
req := Request{
|
||||
ID: ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy),
|
||||
ProjectID: svc.activeProjectID,
|
||||
SourceRequestLogID: reqLogID,
|
||||
Method: reqLog.Method,
|
||||
URL: reqLog.URL,
|
||||
Proto: HTTPProto20, // Attempt HTTP/2.
|
||||
Header: reqLog.Header,
|
||||
Body: reqLog.Body,
|
||||
}
|
||||
|
||||
err = svc.repo.StoreSenderRequest(ctx, req)
|
||||
match, err := req.Matches(expr)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: failed to store request: %w", err)
|
||||
return false, fmt.Errorf("failed to match search expression for sender request (id: %v): %w",
|
||||
req.Id, err,
|
||||
)
|
||||
}
|
||||
|
||||
return req, nil
|
||||
return match, nil
|
||||
}
|
||||
|
||||
func (svc *Service) SetFindReqsFilter(filter FindRequestsFilter) {
|
||||
svc.findReqsFilter = filter
|
||||
}
|
||||
func (svc *Service) CreateOrUpdateRequest(ctx context.Context, req *connect.Request[CreateOrUpdateRequestRequest]) (*connect.Response[CreateOrUpdateRequestResponse], error) {
|
||||
if svc.activeProjectID == "" {
|
||||
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrProjectIDMustBeSet)
|
||||
}
|
||||
|
||||
func (svc *Service) FindReqsFilter() FindRequestsFilter {
|
||||
return svc.findReqsFilter
|
||||
}
|
||||
r := proto.Clone(req.Msg.Request).(*Request)
|
||||
|
||||
func (svc *Service) SendRequest(ctx context.Context, id ulid.ULID) (Request, error) {
|
||||
req, err := svc.repo.FindSenderRequestByID(ctx, svc.activeProjectID, id)
|
||||
if r == nil {
|
||||
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("sender: request is nil"))
|
||||
}
|
||||
|
||||
if r.HttpRequest == nil {
|
||||
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("sender: request.http_request is nil"))
|
||||
}
|
||||
|
||||
if r.Id == "" {
|
||||
r.Id = ulid.Make().String()
|
||||
}
|
||||
|
||||
r.ProjectId = svc.activeProjectID
|
||||
|
||||
if r.HttpRequest.Method == httppb.Method_METHOD_UNSPECIFIED {
|
||||
r.HttpRequest.Method = httppb.Method_METHOD_GET
|
||||
}
|
||||
|
||||
if r.HttpRequest.Protocol == httppb.Protocol_PROTOCOL_UNSPECIFIED {
|
||||
r.HttpRequest.Protocol = httppb.Protocol_PROTOCOL_HTTP20
|
||||
}
|
||||
|
||||
err := svc.repo.StoreSenderRequest(ctx, r)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: failed to find request: %w", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: failed to store request: %w", err))
|
||||
}
|
||||
|
||||
return &connect.Response[CreateOrUpdateRequestResponse]{
|
||||
Msg: &CreateOrUpdateRequestResponse{
|
||||
Request: r,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (svc *Service) CloneFromRequestLog(ctx context.Context, req *connect.Request[CloneFromRequestLogRequest]) (*connect.Response[CloneFromRequestLogResponse], error) {
|
||||
if svc.activeProjectID == "" {
|
||||
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrProjectIDMustBeSet)
|
||||
}
|
||||
|
||||
reqLog, err := svc.reqLogSvc.FindRequestLogByID(ctx, req.Msg.RequestLogId)
|
||||
if err != nil {
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: failed to find request log: %w", err))
|
||||
}
|
||||
|
||||
clonedReqLog := proto.Clone(reqLog).(*reqlog.HttpRequestLog)
|
||||
|
||||
senderReq := &Request{
|
||||
Id: ulid.Make().String(),
|
||||
ProjectId: svc.activeProjectID,
|
||||
SourceRequestLogId: clonedReqLog.Id,
|
||||
HttpRequest: clonedReqLog.Request,
|
||||
}
|
||||
|
||||
err = svc.repo.StoreSenderRequest(ctx, senderReq)
|
||||
if err != nil {
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: failed to store request: %w", err))
|
||||
}
|
||||
|
||||
return &connect.Response[CloneFromRequestLogResponse]{Msg: &CloneFromRequestLogResponse{
|
||||
Request: senderReq,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (svc *Service) SetRequestsFilter(filter *RequestsFilter) {
|
||||
svc.reqsFilter = filter
|
||||
}
|
||||
|
||||
func (svc *Service) RequestsFilter() *RequestsFilter {
|
||||
return svc.reqsFilter
|
||||
}
|
||||
|
||||
func (svc *Service) SendRequest(ctx context.Context, connReq *connect.Request[SendRequestRequest]) (*connect.Response[SendRequestResponse], error) {
|
||||
req, err := svc.repo.FindSenderRequestByID(ctx, svc.activeProjectID, connReq.Msg.RequestId)
|
||||
if err != nil {
|
||||
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("sender: failed to find request: %w", err))
|
||||
}
|
||||
|
||||
httpReq, err := parseHTTPRequest(ctx, req)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: failed to parse HTTP request: %w", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: failed to parse HTTP request: %w", err))
|
||||
}
|
||||
|
||||
resLog, err := svc.sendHTTPRequest(httpReq)
|
||||
httpRes, err := svc.sendHTTPRequest(httpReq)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: could not send HTTP request: %w", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: could not send HTTP request: %w", err))
|
||||
}
|
||||
|
||||
req.Response = &resLog
|
||||
req.HttpResponse = httpRes
|
||||
|
||||
err = svc.repo.StoreSenderRequest(ctx, req)
|
||||
if err != nil {
|
||||
return Request{}, fmt.Errorf("sender: failed to store sender response log: %w", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: failed to store sender response log: %w", err))
|
||||
}
|
||||
|
||||
req.Response = &resLog
|
||||
|
||||
return req, nil
|
||||
return &connect.Response[SendRequestResponse]{
|
||||
Msg: &SendRequestResponse{
|
||||
Request: req,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseHTTPRequest(ctx context.Context, req Request) (*http.Request, error) {
|
||||
ctx = context.WithValue(ctx, protoCtxKey{}, req.Proto)
|
||||
func parseHTTPRequest(ctx context.Context, req *Request) (*http.Request, error) {
|
||||
ctx = context.WithValue(ctx, protoCtxKey{}, req.GetHttpRequest().GetProtocol())
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(ctx, req.Method, req.URL.String(), bytes.NewReader(req.Body))
|
||||
httpReq, err := http.NewRequestWithContext(ctx,
|
||||
req.GetHttpRequest().GetMethod().String(),
|
||||
req.GetHttpRequest().GetUrl(),
|
||||
bytes.NewReader(req.GetHttpRequest().GetBody()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to construct HTTP request: %w", err)
|
||||
}
|
||||
|
||||
if req.Header != nil {
|
||||
httpReq.Header = req.Header
|
||||
for _, header := range req.GetHttpRequest().GetHeaders() {
|
||||
httpReq.Header.Add(header.Key, header.Value)
|
||||
}
|
||||
|
||||
return httpReq, nil
|
||||
}
|
||||
|
||||
func (svc *Service) sendHTTPRequest(httpReq *http.Request) (reqlog.ResponseLog, error) {
|
||||
func (svc *Service) sendHTTPRequest(httpReq *http.Request) (*httppb.Response, error) {
|
||||
res, err := svc.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return reqlog.ResponseLog{}, &SendError{err}
|
||||
return nil, &SendError{err}
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
resLog, err := reqlog.ParseHTTPResponse(res)
|
||||
resLog, err := httppb.ParseHTTPResponse(res)
|
||||
if err != nil {
|
||||
return reqlog.ResponseLog{}, fmt.Errorf("failed to parse http response: %w", err)
|
||||
return nil, fmt.Errorf("failed to parse http response: %w", err)
|
||||
}
|
||||
|
||||
return resLog, err
|
||||
}
|
||||
|
||||
func (svc *Service) SetActiveProjectID(id ulid.ULID) {
|
||||
func (svc *Service) SetActiveProjectID(id string) {
|
||||
svc.activeProjectID = id
|
||||
}
|
||||
|
||||
func (svc *Service) DeleteRequests(ctx context.Context, projectID ulid.ULID) error {
|
||||
return svc.repo.DeleteSenderRequests(ctx, projectID)
|
||||
func (svc *Service) DeleteRequests(ctx context.Context, req *connect.Request[DeleteRequestsRequest]) (*connect.Response[DeleteRequestsResponse], error) {
|
||||
if svc.activeProjectID == "" {
|
||||
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrProjectIDMustBeSet)
|
||||
}
|
||||
|
||||
err := svc.repo.DeleteSenderRequests(ctx, svc.activeProjectID)
|
||||
if err != nil {
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("sender: failed to delete requests: %w", err))
|
||||
}
|
||||
|
||||
return &connect.Response[DeleteRequestsResponse]{}, nil
|
||||
}
|
||||
|
||||
func (e SendError) Error() string {
|
||||
|
Reference in New Issue
Block a user