From 1085ab405e714a912ac2044bd54a8f1106fb2ce0 Mon Sep 17 00:00:00 2001 From: Mario Date: Sat, 21 May 2022 11:43:10 +0200 Subject: [PATCH] Configured tracing on rabbitMQ --- main.go | 97 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 48 insertions(+), 49 deletions(-) diff --git a/main.go b/main.go index 5e4235d..514a3bc 100644 --- a/main.go +++ b/main.go @@ -4,35 +4,49 @@ import ( "beelzebub/parser" "beelzebub/protocols" "beelzebub/tracer" - "context" + "encoding/json" + "fmt" log "github.com/sirupsen/logrus" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readpref" + "github.com/streadway/amqp" "io" "os" - "time" ) var quit = make(chan struct{}) -var mongoClient *mongo.Client +var queue amqp.Queue +var channel amqp.Channel func main() { parser := parser.Init("./configurations/beelzebub.yaml", "./configurations/services/") coreConfigurations, err := parser.ReadConfigurationsCore() - if err != nil { - log.Fatal(err) - } + failOnError(err, fmt.Sprintf("Error during coreConfigurations: ")) fileLogs := configureLoggingByConfigurations(coreConfigurations.Core.Logging) defer fileLogs.Close() beelzebubServicesConfiguration, err := parser.ReadConfigurationsServices() - if err != nil { - log.Fatal(err) + failOnError(err, fmt.Sprintf("Error during ReadConfigurationsServices: ")) + + if coreConfigurations.Core.Tracing.RabbitMQEnabled { + conn, err := amqp.Dial(coreConfigurations.Core.Tracing.RabbitMQURI) + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + channel, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer channel.Close() + + queue, err = channel.QueueDeclare( + "event", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") } // Init Protocol strategies @@ -42,16 +56,6 @@ func main() { // Init protocol manager, with simple log on stout trace strategy and default protocol HTTP protocolManager := protocols.InitProtocolManager(traceStrategyStdout, hypertextTransferProtocolStrategy) - if coreConfigurations.Core.Tracing.MongoEnabled { - mongoClient = buildMongoClient(coreConfigurations.Core.Tracing.MongoURI) - defer func(mongoClient *mongo.Client, ctx context.Context) { - err := mongoClient.Disconnect(ctx) - if err != nil { - log.Error(err) - } - }(mongoClient, context.TODO()) - } - for _, beelzebubServiceConfiguration := range beelzebubServicesConfiguration { switch beelzebubServiceConfiguration.Protocol { case "http": @@ -66,29 +70,38 @@ func main() { } err := protocolManager.InitService(beelzebubServiceConfiguration) - if err != nil { - log.Errorf("Error during init protocol: %s, %s", beelzebubServiceConfiguration.Protocol, err.Error()) - } + failOnError(err, fmt.Sprintf("Error during init protocol: %s, ", beelzebubServiceConfiguration.Protocol)) } <-quit } + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + } +} + func traceStrategyStdout(event tracer.Event) { log.WithFields(log.Fields{ "status": event.Status, "event": event, }).Info("New Event") - if mongoClient != nil { - coll := mongoClient.Database("beelzebub").Collection("event") - data, err := bson.Marshal(event) - if err != nil { - log.Fatal(err) - } + //TODO check amqp.Channe + if queue != (amqp.Queue{}) { + eventJSON, err := json.Marshal(event) + failOnError(err, "Failed to publish a message") - _, err = coll.InsertOne(context.TODO(), data) - if err != nil { - log.Fatal(err) - } + err = channel.Publish( + "", + queue.Name, + false, + false, + amqp.Publishing{ + ContentType: "application/json", + Body: eventJSON, + }) + failOnError(err, "Failed to publish a message") } } @@ -111,17 +124,3 @@ func configureLoggingByConfigurations(configurations parser.Logging) *os.File { } return file } - -func buildMongoClient(uri string) *mongo.Client { - // Create a new client and connect to the server - client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri).SetServerSelectionTimeout(time.Second*2)) - if err != nil { - log.Fatal(err) - } - // Ping the primary - if err := client.Ping(context.TODO(), readpref.Primary()); err != nil { - log.Fatal(err) - } - log.Println("Successfully connected and pinged.") - return client -}