Configured tracing on rabbitMQ

This commit is contained in:
Mario
2022-05-21 11:43:10 +02:00
parent ab2705f9c0
commit 1085ab405e

97
main.go
View File

@ -4,35 +4,49 @@ import (
"beelzebub/parser" "beelzebub/parser"
"beelzebub/protocols" "beelzebub/protocols"
"beelzebub/tracer" "beelzebub/tracer"
"context" "encoding/json"
"fmt"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/bson" "github.com/streadway/amqp"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"io" "io"
"os" "os"
"time"
) )
var quit = make(chan struct{}) var quit = make(chan struct{})
var mongoClient *mongo.Client var queue amqp.Queue
var channel amqp.Channel
func main() { func main() {
parser := parser.Init("./configurations/beelzebub.yaml", "./configurations/services/") parser := parser.Init("./configurations/beelzebub.yaml", "./configurations/services/")
coreConfigurations, err := parser.ReadConfigurationsCore() coreConfigurations, err := parser.ReadConfigurationsCore()
if err != nil { failOnError(err, fmt.Sprintf("Error during coreConfigurations: "))
log.Fatal(err)
}
fileLogs := configureLoggingByConfigurations(coreConfigurations.Core.Logging) fileLogs := configureLoggingByConfigurations(coreConfigurations.Core.Logging)
defer fileLogs.Close() defer fileLogs.Close()
beelzebubServicesConfiguration, err := parser.ReadConfigurationsServices() beelzebubServicesConfiguration, err := parser.ReadConfigurationsServices()
if err != nil { failOnError(err, fmt.Sprintf("Error during ReadConfigurationsServices: "))
log.Fatal(err)
if coreConfigurations.Core.Tracing.RabbitMQEnabled {
conn, err := amqp.Dial(coreConfigurations.Core.Tracing.RabbitMQURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
channel, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
queue, err = channel.QueueDeclare(
"event", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
} }
// Init Protocol strategies // Init Protocol strategies
@ -42,16 +56,6 @@ func main() {
// Init protocol manager, with simple log on stout trace strategy and default protocol HTTP // Init protocol manager, with simple log on stout trace strategy and default protocol HTTP
protocolManager := protocols.InitProtocolManager(traceStrategyStdout, hypertextTransferProtocolStrategy) protocolManager := protocols.InitProtocolManager(traceStrategyStdout, hypertextTransferProtocolStrategy)
if coreConfigurations.Core.Tracing.MongoEnabled {
mongoClient = buildMongoClient(coreConfigurations.Core.Tracing.MongoURI)
defer func(mongoClient *mongo.Client, ctx context.Context) {
err := mongoClient.Disconnect(ctx)
if err != nil {
log.Error(err)
}
}(mongoClient, context.TODO())
}
for _, beelzebubServiceConfiguration := range beelzebubServicesConfiguration { for _, beelzebubServiceConfiguration := range beelzebubServicesConfiguration {
switch beelzebubServiceConfiguration.Protocol { switch beelzebubServiceConfiguration.Protocol {
case "http": case "http":
@ -66,29 +70,38 @@ func main() {
} }
err := protocolManager.InitService(beelzebubServiceConfiguration) err := protocolManager.InitService(beelzebubServiceConfiguration)
if err != nil { failOnError(err, fmt.Sprintf("Error during init protocol: %s, ", beelzebubServiceConfiguration.Protocol))
log.Errorf("Error during init protocol: %s, %s", beelzebubServiceConfiguration.Protocol, err.Error())
}
} }
<-quit <-quit
} }
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func traceStrategyStdout(event tracer.Event) { func traceStrategyStdout(event tracer.Event) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"status": event.Status, "status": event.Status,
"event": event, "event": event,
}).Info("New Event") }).Info("New Event")
if mongoClient != nil { //TODO check amqp.Channe
coll := mongoClient.Database("beelzebub").Collection("event") if queue != (amqp.Queue{}) {
data, err := bson.Marshal(event) eventJSON, err := json.Marshal(event)
if err != nil { failOnError(err, "Failed to publish a message")
log.Fatal(err)
}
_, err = coll.InsertOne(context.TODO(), data) err = channel.Publish(
if err != nil { "",
log.Fatal(err) queue.Name,
} false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: eventJSON,
})
failOnError(err, "Failed to publish a message")
} }
} }
@ -111,17 +124,3 @@ func configureLoggingByConfigurations(configurations parser.Logging) *os.File {
} }
return file return file
} }
func buildMongoClient(uri string) *mongo.Client {
// Create a new client and connect to the server
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri).SetServerSelectionTimeout(time.Second*2))
if err != nil {
log.Fatal(err)
}
// Ping the primary
if err := client.Ping(context.TODO(), readpref.Primary()); err != nil {
log.Fatal(err)
}
log.Println("Successfully connected and pinged.")
return client
}