Replace SQLite with BadgerDB

This commit is contained in:
David Stotijn
2022-01-21 11:45:54 +01:00
parent 8a3b3cbf02
commit d84d2d0905
49 changed files with 2496 additions and 2677 deletions

53
pkg/db/badger/badger.go Normal file
View File

@ -0,0 +1,53 @@
package badger
import (
"fmt"
"github.com/dgraph-io/badger/v3"
)
const (
// Key prefixes. Each prefix value should be unique.
projectPrefix = 0x00
reqLogPrefix = 0x01
resLogPrefix = 0x02
// Request log indices.
reqLogProjectIDIndex = 0x00
)
// Database is used to store and retrieve data from an underlying Badger database.
type Database struct {
badger *badger.DB
}
// OpenDatabase opens a new Badger database.
func OpenDatabase(opts badger.Options) (*Database, error) {
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("badger: failed to open database: %w", err)
}
return &Database{badger: db}, nil
}
// Close closes the underlying Badger database.
func (db *Database) Close() error {
return db.badger.Close()
}
// DatabaseFromBadgerDB returns a Database with `db` set as the underlying
// Badger database.
func DatabaseFromBadgerDB(db *badger.DB) *Database {
return &Database{badger: db}
}
func entryKey(prefix, index byte, value []byte) []byte {
// Key consists of: | prefix (byte) | index (byte) | value
key := make([]byte, 2+len(value))
key[0] = prefix
key[1] = index
copy(key[2:len(value)+2], value)
return key
}

110
pkg/db/badger/proj.go Normal file
View File

@ -0,0 +1,110 @@
package badger
import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
"github.com/dgraph-io/badger/v3"
"github.com/oklog/ulid"
"github.com/dstotijn/hetty/pkg/proj"
)
func (db *Database) UpsertProject(ctx context.Context, project proj.Project) error {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(project)
if err != nil {
return fmt.Errorf("badger: failed to encode project: %w", err)
}
err = db.badger.Update(func(txn *badger.Txn) error {
return txn.Set(entryKey(projectPrefix, 0, project.ID[:]), buf.Bytes())
})
if err != nil {
return fmt.Errorf("badger: failed to commit transaction: %w", err)
}
return nil
}
func (db *Database) FindProjectByID(ctx context.Context, projectID ulid.ULID) (project proj.Project, err error) {
err = db.badger.View(func(txn *badger.Txn) error {
item, err := txn.Get(entryKey(projectPrefix, 0, projectID[:]))
if err != nil {
return err
}
err = item.Value(func(rawProject []byte) error {
return gob.NewDecoder(bytes.NewReader(rawProject)).Decode(&project)
})
if err != nil {
return fmt.Errorf("failed to retrieve or parse project: %w", err)
}
return nil
})
if errors.Is(err, badger.ErrKeyNotFound) {
return proj.Project{}, proj.ErrProjectNotFound
}
if err != nil {
return proj.Project{}, fmt.Errorf("badger: failed to commit transaction: %w", err)
}
return project, nil
}
func (db *Database) DeleteProject(ctx context.Context, projectID ulid.ULID) error {
err := db.ClearRequestLogs(ctx, projectID)
if err != nil {
return fmt.Errorf("badger: failed to delete project request logs: %w", err)
}
err = db.badger.Update(func(txn *badger.Txn) error {
return txn.Delete(entryKey(projectPrefix, 0, projectID[:]))
})
if err != nil {
return fmt.Errorf("badger: failed to delete project item: %w", err)
}
return nil
}
func (db *Database) Projects(ctx context.Context) ([]proj.Project, error) {
projects := make([]proj.Project, 0)
err := db.badger.View(func(txn *badger.Txn) error {
var rawProject []byte
prefix := entryKey(projectPrefix, 0, nil)
iterator := txn.NewIterator(badger.DefaultIteratorOptions)
defer iterator.Close()
for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() {
rawProject, err := iterator.Item().ValueCopy(rawProject)
if err != nil {
return fmt.Errorf("failed to copy value: %w", err)
}
var project proj.Project
err = gob.NewDecoder(bytes.NewReader(rawProject)).Decode(&project)
if err != nil {
return fmt.Errorf("failed to decode project: %w", err)
}
projects = append(projects, project)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("badger: failed to commit transaction: %w", err)
}
return projects, nil
}

284
pkg/db/badger/proj_test.go Normal file
View File

@ -0,0 +1,284 @@
package badger
import (
"bytes"
"context"
"encoding/gob"
"errors"
"math/rand"
"regexp"
"testing"
"time"
badgerdb "github.com/dgraph-io/badger/v3"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/oklog/ulid"
"github.com/dstotijn/hetty/pkg/proj"
"github.com/dstotijn/hetty/pkg/scope"
"github.com/dstotijn/hetty/pkg/search"
)
//nolint:gosec
var ulidEntropy = rand.New(rand.NewSource(time.Now().UnixNano()))
var regexpCompareOpt = cmp.Comparer(func(x, y *regexp.Regexp) bool {
switch {
case x == nil && y == nil:
return true
case x == nil || y == nil:
return false
default:
return x.String() == y.String()
}
})
func TestUpsertProject(t *testing.T) {
t.Parallel()
badgerDB, err := badgerdb.Open(badgerdb.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to open badger database: %v", err)
}
database := DatabaseFromBadgerDB(badgerDB)
defer database.Close()
searchExpr, err := search.ParseQuery("foo AND bar OR NOT baz")
if err != nil {
t.Fatalf("unexpected error (expected: nil, got: %v)", err)
}
exp := proj.Project{
ID: ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy),
Name: "foobar",
Settings: proj.Settings{
ReqLogBypassOutOfScope: true,
ReqLogOnlyFindInScope: true,
ScopeRules: []scope.Rule{
{
URL: regexp.MustCompile("^https://(.*)example.com(.*)$"),
Header: scope.Header{
Key: regexp.MustCompile("^X-Foo(.*)$"),
Value: regexp.MustCompile("^foo(.*)$"),
},
Body: regexp.MustCompile("^foo(.*)"),
},
},
SearchExpr: searchExpr,
},
}
err = database.UpsertProject(context.Background(), exp)
if err != nil {
t.Fatalf("unexpected error storing project: %v", err)
}
var rawProject []byte
err = badgerDB.View(func(txn *badgerdb.Txn) error {
item, err := txn.Get(entryKey(projectPrefix, 0, exp.ID[:]))
if err != nil {
return err
}
rawProject, err = item.ValueCopy(nil)
return err
})
if err != nil {
t.Fatalf("unexpected error retrieving project from database: %v", err)
}
got := proj.Project{}
err = gob.NewDecoder(bytes.NewReader(rawProject)).Decode(&got)
if err != nil {
t.Fatalf("unexpected error decoding project: %v", err)
}
if diff := cmp.Diff(exp, got, regexpCompareOpt, cmpopts.IgnoreUnexported(proj.Project{})); diff != "" {
t.Fatalf("project not equal (-exp, +got):\n%v", diff)
}
}
func TestFindProjectByID(t *testing.T) {
t.Parallel()
t.Run("existing project", func(t *testing.T) {
t.Parallel()
badgerDB, err := badgerdb.Open(badgerdb.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to open badger database: %v", err)
}
database := DatabaseFromBadgerDB(badgerDB)
defer database.Close()
exp := proj.Project{
ID: ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy),
Name: "foobar",
Settings: proj.Settings{},
}
buf := bytes.Buffer{}
err = gob.NewEncoder(&buf).Encode(exp)
if err != nil {
t.Fatalf("unexpected error encoding project: %v", err)
}
err = badgerDB.Update(func(txn *badgerdb.Txn) error {
return txn.Set(entryKey(projectPrefix, 0, exp.ID[:]), buf.Bytes())
})
if err != nil {
t.Fatalf("unexpected error setting project: %v", err)
}
got, err := database.FindProjectByID(context.Background(), exp.ID)
if err != nil {
t.Fatalf("unexpected error finding project: %v", err)
}
if diff := cmp.Diff(exp, got, cmpopts.IgnoreUnexported(proj.Project{})); diff != "" {
t.Fatalf("project not equal (-exp, +got):\n%v", diff)
}
})
t.Run("project not found", func(t *testing.T) {
t.Parallel()
database, err := OpenDatabase(badgerdb.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to open badger database: %v", err)
}
defer database.Close()
projectID := ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy)
_, err = database.FindProjectByID(context.Background(), projectID)
if !errors.Is(err, proj.ErrProjectNotFound) {
t.Fatalf("expected `proj.ErrProjectNotFound`, got: %v", err)
}
})
}
func TestDeleteProject(t *testing.T) {
t.Parallel()
badgerDB, err := badgerdb.Open(badgerdb.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to open badger database: %v", err)
}
database := DatabaseFromBadgerDB(badgerDB)
defer database.Close()
// Store fixtures.
projectID := ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy)
reqLogID := ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy)
err = badgerDB.Update(func(txn *badgerdb.Txn) error {
if err := txn.Set(entryKey(projectPrefix, 0, projectID[:]), nil); err != nil {
return err
}
if err := txn.Set(entryKey(reqLogPrefix, 0, reqLogID[:]), nil); err != nil {
return err
}
if err := txn.Set(entryKey(resLogPrefix, 0, reqLogID[:]), nil); err != nil {
return err
}
err := txn.Set(entryKey(reqLogPrefix, reqLogProjectIDIndex, append(projectID[:], reqLogID[:]...)), nil)
if err != nil {
return err
}
return nil
})
if err != nil {
t.Fatalf("unexpected error creating fixtures: %v", err)
}
err = database.DeleteProject(context.Background(), projectID)
if err != nil {
t.Fatalf("unexpected error deleting project: %v", err)
}
// Assert project key was deleted.
err = badgerDB.View(func(txn *badgerdb.Txn) error {
_, err := txn.Get(entryKey(projectPrefix, 0, projectID[:]))
return err
})
if !errors.Is(err, badgerdb.ErrKeyNotFound) {
t.Fatalf("expected `badger.ErrKeyNotFound`, got: %v", err)
}
// Assert request log item was deleted.
err = badgerDB.View(func(txn *badgerdb.Txn) error {
_, err := txn.Get(entryKey(reqLogPrefix, 0, reqLogID[:]))
return err
})
if !errors.Is(err, badgerdb.ErrKeyNotFound) {
t.Fatalf("expected `badger.ErrKeyNotFound`, got: %v", err)
}
// Assert response log item was deleted.
err = badgerDB.View(func(txn *badgerdb.Txn) error {
_, err := txn.Get(entryKey(resLogPrefix, 0, reqLogID[:]))
return err
})
if !errors.Is(err, badgerdb.ErrKeyNotFound) {
t.Fatalf("expected `badger.ErrKeyNotFound`, got: %v", err)
}
// Assert request log project ID index key was deleted.
err = badgerDB.View(func(txn *badgerdb.Txn) error {
_, err := txn.Get(entryKey(reqLogPrefix, reqLogProjectIDIndex, append(projectID[:], reqLogID[:]...)))
return err
})
if !errors.Is(err, badgerdb.ErrKeyNotFound) {
t.Fatalf("expected `badger.ErrKeyNotFound`, got: %v", err)
}
}
func TestProjects(t *testing.T) {
t.Parallel()
database, err := OpenDatabase(badgerdb.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to open badger database: %v", err)
}
defer database.Close()
exp := []proj.Project{
{
ID: ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy),
Name: "one",
},
{
ID: ulid.MustNew(ulid.Timestamp(time.Now())+100, ulidEntropy),
Name: "two",
},
}
// Store fixtures.
for _, project := range exp {
err = database.UpsertProject(context.Background(), project)
if err != nil {
t.Fatalf("unexpected error creating project fixture: %v", err)
}
}
got, err := database.Projects(context.Background())
if err != nil {
t.Fatalf("unexpected error finding projects: %v", err)
}
if len(exp) != len(got) {
t.Fatalf("expected %v projects, got: %v", len(exp), len(got))
}
if diff := cmp.Diff(exp, got, cmpopts.IgnoreUnexported(proj.Project{})); diff != "" {
t.Fatalf("projects not equal (-exp, +got):\n%v", diff)
}
}

251
pkg/db/badger/reqlog.go Normal file
View File

@ -0,0 +1,251 @@
package badger
import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
"github.com/dgraph-io/badger/v3"
"github.com/oklog/ulid"
"github.com/dstotijn/hetty/pkg/reqlog"
"github.com/dstotijn/hetty/pkg/scope"
)
func (db *Database) FindRequestLogs(ctx context.Context, filter reqlog.FindRequestsFilter, scope *scope.Scope) ([]reqlog.RequestLog, error) {
if filter.ProjectID.Compare(ulid.ULID{}) == 0 {
return nil, reqlog.ErrProjectIDMustBeSet
}
txn := db.badger.NewTransaction(false)
defer txn.Discard()
reqLogIDs, err := findRequestLogIDsByProjectID(txn, filter.ProjectID)
if err != nil {
return nil, fmt.Errorf("badger: failed to find request log IDs: %w", err)
}
reqLogs := make([]reqlog.RequestLog, 0, len(reqLogIDs))
for _, reqLogID := range reqLogIDs {
reqLog, err := getRequestLogWithResponse(txn, reqLogID)
if err != nil {
return nil, fmt.Errorf("badger: failed to get request log (id: %v): %w", reqLogID.String(), err)
}
if filter.OnlyInScope {
if !reqLog.MatchScope(scope) {
continue
}
}
// Filter by search expression.
// TODO: Once pagination is introduced, this filter logic should be done
// as items are retrieved (e.g. when using a `badger.Iterator`).
if filter.SearchExpr != nil {
match, err := reqLog.Matches(filter.SearchExpr)
if err != nil {
return nil, fmt.Errorf(
"badger: failed to match search expression for request log (id: %v): %w",
reqLogID.String(), err,
)
}
if !match {
continue
}
}
reqLogs = append(reqLogs, reqLog)
}
return reqLogs, nil
}
func getRequestLogWithResponse(txn *badger.Txn, reqLogID ulid.ULID) (reqlog.RequestLog, error) {
item, err := txn.Get(entryKey(reqLogPrefix, 0, reqLogID[:]))
if err != nil {
return reqlog.RequestLog{}, fmt.Errorf("failed to lookup request log item: %w", err)
}
reqLog := reqlog.RequestLog{
ID: reqLogID,
}
err = item.Value(func(rawReqLog []byte) error {
err = gob.NewDecoder(bytes.NewReader(rawReqLog)).Decode(&reqLog)
if err != nil {
return fmt.Errorf("failed to decode request log: %w", err)
}
return nil
})
if err != nil {
return reqlog.RequestLog{}, fmt.Errorf("failed to retrieve or parse request log value: %w", err)
}
item, err = txn.Get(entryKey(resLogPrefix, 0, reqLogID[:]))
if errors.Is(err, badger.ErrKeyNotFound) {
return reqLog, nil
}
if err != nil {
return reqlog.RequestLog{}, fmt.Errorf("failed to get response log: %w", err)
}
err = item.Value(func(rawReslog []byte) error {
var resLog reqlog.ResponseLog
err = gob.NewDecoder(bytes.NewReader(rawReslog)).Decode(&resLog)
if err != nil {
return fmt.Errorf("failed to decode response log: %w", err)
}
reqLog.Response = &resLog
return nil
})
if err != nil {
return reqlog.RequestLog{}, fmt.Errorf("failed to retrieve or parse response log value: %w", err)
}
return reqLog, nil
}
func (db *Database) FindRequestLogByID(ctx context.Context, reqLogID ulid.ULID) (reqLog reqlog.RequestLog, err error) {
txn := db.badger.NewTransaction(false)
defer txn.Discard()
reqLog, err = getRequestLogWithResponse(txn, reqLogID)
if err != nil {
return reqlog.RequestLog{}, fmt.Errorf("badger: failed to get request log: %w", err)
}
return reqLog, nil
}
func (db *Database) StoreRequestLog(ctx context.Context, reqLog reqlog.RequestLog) error {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(reqLog)
if err != nil {
return fmt.Errorf("badger: failed to encode request log: %w", err)
}
entries := []*badger.Entry{
// Request log itself.
{
Key: entryKey(reqLogPrefix, 0, reqLog.ID[:]),
Value: buf.Bytes(),
},
// Index by project ID.
{
Key: entryKey(reqLogPrefix, reqLogProjectIDIndex, append(reqLog.ProjectID[:], reqLog.ID[:]...)),
},
}
err = db.badger.Update(func(txn *badger.Txn) error {
for i := range entries {
err := txn.SetEntry(entries[i])
if err != nil {
return err
}
}
return nil
})
if err != nil {
return fmt.Errorf("badger: failed to commit transaction: %w", err)
}
return nil
}
func (db *Database) StoreResponseLog(ctx context.Context, reqLogID ulid.ULID, resLog reqlog.ResponseLog) error {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(resLog)
if err != nil {
return fmt.Errorf("badger: failed to encode response log: %w", err)
}
err = db.badger.Update(func(txn *badger.Txn) error {
return txn.SetEntry(&badger.Entry{
Key: entryKey(resLogPrefix, 0, reqLogID[:]),
Value: buf.Bytes(),
})
})
if err != nil {
return fmt.Errorf("badger: failed to commit transaction: %w", err)
}
return nil
}
func (db *Database) ClearRequestLogs(ctx context.Context, projectID ulid.ULID) error {
// Note: this transaction is used just for reading; we use the `badger.WriteBatch`
// API to bulk delete items.
txn := db.badger.NewTransaction(false)
defer txn.Discard()
reqLogIDs, err := findRequestLogIDsByProjectID(txn, projectID)
if err != nil {
return fmt.Errorf("badger: failed to find request log IDs: %w", err)
}
writeBatch := db.badger.NewWriteBatch()
defer writeBatch.Cancel()
for _, reqLogID := range reqLogIDs {
// Delete request logs.
err := writeBatch.Delete(entryKey(reqLogPrefix, 0, reqLogID[:]))
if err != nil {
return fmt.Errorf("badger: failed to delete request log: %w", err)
}
// Delete related response log.
err = writeBatch.Delete(entryKey(resLogPrefix, 0, reqLogID[:]))
if err != nil {
return fmt.Errorf("badger: failed to delete request log: %w", err)
}
}
if err := writeBatch.Flush(); err != nil {
return fmt.Errorf("badger: failed to commit batch write: %w", err)
}
err = db.badger.DropPrefix(entryKey(reqLogPrefix, reqLogProjectIDIndex, projectID[:]))
if err != nil {
return fmt.Errorf("badger: failed to drop request log project ID index items: %w", err)
}
return nil
}
func findRequestLogIDsByProjectID(txn *badger.Txn, projectID ulid.ULID) ([]ulid.ULID, error) {
reqLogIDs := make([]ulid.ULID, 0)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
iterator := txn.NewIterator(opts)
defer iterator.Close()
var projectIndexKey []byte
prefix := entryKey(reqLogPrefix, reqLogProjectIDIndex, projectID[:])
for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() {
projectIndexKey = iterator.Item().KeyCopy(projectIndexKey)
var id ulid.ULID
// The request log ID starts *after* the first 2 prefix and index bytes
// and the 16 byte project ID.
if err := id.UnmarshalBinary(projectIndexKey[18:]); err != nil {
return nil, fmt.Errorf("failed to parse request log ID: %w", err)
}
reqLogIDs = append(reqLogIDs, id)
}
return reqLogIDs, nil
}

View File

@ -0,0 +1,121 @@
package badger
import (
"context"
"errors"
"net/http"
"net/url"
"testing"
"time"
badgerdb "github.com/dgraph-io/badger/v3"
"github.com/google/go-cmp/cmp"
"github.com/oklog/ulid"
"github.com/dstotijn/hetty/pkg/reqlog"
)
func TestFindRequestLogs(t *testing.T) {
t.Parallel()
t.Run("without project ID in filter", func(t *testing.T) {
t.Parallel()
database, err := OpenDatabase(badgerdb.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to open badger database: %v", err)
}
defer database.Close()
filter := reqlog.FindRequestsFilter{}
_, err = database.FindRequestLogs(context.Background(), filter, nil)
if !errors.Is(err, reqlog.ErrProjectIDMustBeSet) {
t.Fatalf("expected `reqlog.ErrProjectIDMustBeSet`, got: %v", err)
}
})
t.Run("returns request logs and related response logs", func(t *testing.T) {
t.Parallel()
database, err := OpenDatabase(badgerdb.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to open badger database: %v", err)
}
defer database.Close()
projectID := ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy)
exp := []reqlog.RequestLog{
{
ID: ulid.MustNew(ulid.Timestamp(time.Now()), ulidEntropy),
ProjectID: projectID,
URL: mustParseURL(t, "https://example.com/foobar"),
Method: http.MethodPost,
Proto: "HTTP/1.1",
Header: http.Header{
"X-Foo": []string{"baz"},
},
Body: []byte("foo"),
Response: &reqlog.ResponseLog{
Proto: "HTTP/1.1",
Status: "200 OK",
StatusCode: 200,
Header: http.Header{
"X-Yolo": []string{"swag"},
},
Body: []byte("bar"),
},
},
{
ID: ulid.MustNew(ulid.Timestamp(time.Now())+100, ulidEntropy),
ProjectID: projectID,
URL: mustParseURL(t, "https://example.com/foo?bar=baz"),
Method: http.MethodGet,
Proto: "HTTP/1.1",
Header: http.Header{
"X-Foo": []string{"baz"},
},
},
}
// Store fixtures.
for _, reqLog := range exp {
err = database.StoreRequestLog(context.Background(), reqLog)
if err != nil {
t.Fatalf("unexpected error creating request log fixture: %v", err)
}
if reqLog.Response != nil {
err = database.StoreResponseLog(context.Background(), reqLog.ID, *reqLog.Response)
if err != nil {
t.Fatalf("unexpected error creating response log fixture: %v", err)
}
}
}
filter := reqlog.FindRequestsFilter{
ProjectID: projectID,
}
got, err := database.FindRequestLogs(context.Background(), filter, nil)
if err != nil {
t.Fatalf("unexpected error finding request logs: %v", err)
}
if diff := cmp.Diff(exp, got); diff != "" {
t.Fatalf("request logs not equal (-exp, +got):\n%v", diff)
}
})
}
func mustParseURL(t *testing.T, s string) *url.URL {
t.Helper()
u, err := url.Parse(s)
if err != nil {
panic(err)
}
return u
}