mirror of
https://github.com/mariocandela/beelzebub.git
synced 2025-07-01 18:47:26 -04:00
Refactored initialization Beelzebub core with Builder Pattern (#14)
* Fixed deprecated function * Coded builder pattern * coded the first version builder * coded rabbitmq into Builder.go * Refactored builder, and configured director * refactoring rabbitmq eventtracing * Refactoring builder, managed close connections * Fixed typos Co-authored-by: mariocandela <mario.candela@nttdata.com>
This commit is contained in:
125
builder/Builder.go
Normal file
125
builder/Builder.go
Normal file
@ -0,0 +1,125 @@
|
||||
package builder
|
||||
|
||||
import (
|
||||
"beelzebub/parser"
|
||||
"beelzebub/protocols"
|
||||
"beelzebub/tracer"
|
||||
"errors"
|
||||
"fmt"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
const RabbitmqQueueName = "event"
|
||||
|
||||
type Builder struct {
|
||||
beelzebubServicesConfiguration []parser.BeelzebubServiceConfiguration
|
||||
traceStrategy tracer.Strategy
|
||||
rabbitMQChannel *amqp.Channel
|
||||
rabbitMQConnection *amqp.Connection
|
||||
logsFile *os.File
|
||||
}
|
||||
|
||||
func (b *Builder) setTraceStrategy(traceStrategy tracer.Strategy) {
|
||||
b.traceStrategy = traceStrategy
|
||||
}
|
||||
|
||||
func (b *Builder) buildLogger(configurations parser.Logging) error {
|
||||
logsFile, err := os.OpenFile(configurations.LogsPath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.SetOutput(io.MultiWriter(os.Stdout, logsFile))
|
||||
|
||||
log.SetFormatter(&log.JSONFormatter{
|
||||
DisableTimestamp: configurations.LogDisableTimestamp,
|
||||
})
|
||||
log.SetReportCaller(configurations.DebugReportCaller)
|
||||
if configurations.Debug {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
} else {
|
||||
log.SetLevel(log.InfoLevel)
|
||||
}
|
||||
b.logsFile = logsFile
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *Builder) buildRabbitMQ(rabbitMQURI string) error {
|
||||
rabbitMQConnection, err := amqp.Dial(rabbitMQURI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.rabbitMQChannel, err = rabbitMQConnection.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
|
||||
if _, err = b.rabbitMQChannel.QueueDeclare(RabbitmqQueueName, false, false, false, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.rabbitMQConnection = rabbitMQConnection
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) Close() error {
|
||||
if err := b.rabbitMQChannel.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.rabbitMQConnection.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.logsFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) Run() error {
|
||||
// Init Protocol strategies
|
||||
secureShellStrategy := &protocols.SecureShellStrategy{}
|
||||
hypertextTransferProtocolStrategy := &protocols.HypertextTransferProtocolStrategy{}
|
||||
transmissionControlProtocolStrategy := &protocols.TransmissionControlProtocolStrategy{}
|
||||
|
||||
// Init Tracer strategies, and set the trace strategy default HTTP
|
||||
protocolManager := protocols.InitProtocolManager(b.traceStrategy, hypertextTransferProtocolStrategy)
|
||||
|
||||
for _, beelzebubServiceConfiguration := range b.beelzebubServicesConfiguration {
|
||||
switch beelzebubServiceConfiguration.Protocol {
|
||||
case "http":
|
||||
protocolManager.SetProtocolStrategy(hypertextTransferProtocolStrategy)
|
||||
break
|
||||
case "ssh":
|
||||
protocolManager.SetProtocolStrategy(secureShellStrategy)
|
||||
break
|
||||
case "tcp":
|
||||
protocolManager.SetProtocolStrategy(transmissionControlProtocolStrategy)
|
||||
break
|
||||
default:
|
||||
log.Fatalf("Protocol %s not managed", beelzebubServiceConfiguration.Protocol)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := protocolManager.InitService(beelzebubServiceConfiguration); err != nil {
|
||||
return errors.New(fmt.Sprintf("Error during init protocol: %s, %s", beelzebubServiceConfiguration.Protocol, err.Error()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) build() *Builder {
|
||||
return &Builder{
|
||||
beelzebubServicesConfiguration: b.beelzebubServicesConfiguration,
|
||||
traceStrategy: b.traceStrategy,
|
||||
}
|
||||
}
|
||||
|
||||
func NewBuilder() *Builder {
|
||||
return &Builder{}
|
||||
}
|
67
builder/Director.go
Normal file
67
builder/Director.go
Normal file
@ -0,0 +1,67 @@
|
||||
package builder
|
||||
|
||||
import (
|
||||
"beelzebub/parser"
|
||||
"beelzebub/tracer"
|
||||
"context"
|
||||
"encoding/json"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Director struct {
|
||||
builder *Builder
|
||||
}
|
||||
|
||||
func NewDirector(builder *Builder) *Director {
|
||||
return &Director{
|
||||
builder: builder,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Director) BuildBeelzebub(beelzebubCoreConfigurations *parser.BeelzebubCoreConfigurations, beelzebubServicesConfiguration []parser.BeelzebubServiceConfiguration) (*Builder, error) {
|
||||
d.builder.beelzebubServicesConfiguration = beelzebubServicesConfiguration
|
||||
|
||||
if err := d.builder.buildLogger(beelzebubCoreConfigurations.Core.Logging); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
d.builder.setTraceStrategy(d.standardOutStrategy)
|
||||
|
||||
if beelzebubCoreConfigurations.Core.Tracing.RabbitMQEnabled {
|
||||
d.builder.setTraceStrategy(d.rabbitMQTraceStrategy)
|
||||
err := d.builder.buildRabbitMQ(beelzebubCoreConfigurations.Core.Tracing.RabbitMQURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return d.builder.build(), nil
|
||||
}
|
||||
|
||||
func (d *Director) standardOutStrategy(event tracer.Event) {
|
||||
log.WithFields(log.Fields{
|
||||
"status": event.Status,
|
||||
"event": event,
|
||||
}).Info("New Event")
|
||||
}
|
||||
|
||||
func (d *Director) rabbitMQTraceStrategy(event tracer.Event) {
|
||||
log.WithFields(log.Fields{
|
||||
"status": event.Status,
|
||||
"event": event,
|
||||
}).Info("New Event")
|
||||
|
||||
log.Debug("Push Event on queue")
|
||||
eventJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
publishing := amqp.Publishing{ContentType: "application/json", Body: eventJSON}
|
||||
|
||||
if err = d.builder.rabbitMQChannel.PublishWithContext(context.TODO(), "", RabbitmqQueueName, false, false, publishing); err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user