This commit is contained in:
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
@@ -0,0 +1,217 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
import amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
type RabbitMQ struct {
|
||||
logger *zap.Logger
|
||||
config *RabbitMqConfig
|
||||
conn *amqp.Connection
|
||||
}
|
||||
|
||||
var _ Queue = RabbitMQ{}
|
||||
|
||||
type RabbitMqConfig struct {
|
||||
Username string
|
||||
Password string
|
||||
Host string
|
||||
Port int
|
||||
}
|
||||
|
||||
func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue {
|
||||
|
||||
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port))
|
||||
if err != nil {
|
||||
logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
//sendBasic(logger, config, conn)
|
||||
//receiveBasic(logger, config, conn)
|
||||
sendPublishingBasic(logger, config, conn)
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
receivePublishingBasic(logger, config, conn)
|
||||
}
|
||||
//sendMany(logger, config, conn)
|
||||
|
||||
return &RabbitMQ{
|
||||
logger: logger,
|
||||
config: config,
|
||||
conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
body := "Hello world!"
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
counter := 0
|
||||
for {
|
||||
body := fmt.Sprintf("message nr: %d", counter)
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
counter += 1
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(q.Name,
|
||||
"",
|
||||
true,
|
||||
false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
|
||||
for d := range msgs {
|
||||
logger.Info("Received msg", zap.String("body", string(d.Body)))
|
||||
}
|
||||
}()
|
||||
}
|
||||
func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
body := "Hello world!"
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
counter := 0
|
||||
for {
|
||||
body := fmt.Sprintf("message nr: %d", counter)
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
counter += 1
|
||||
}
|
||||
}()
|
||||
}
|
||||
func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
counter := 0
|
||||
for {
|
||||
body := fmt.Sprintf("message nr: %d", counter)
|
||||
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
ContentType: "text/plain", Body: []byte(body)})
|
||||
if err != nil {
|
||||
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
logger.Info("Sent message", zap.String("body", body))
|
||||
counter += 1
|
||||
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
}
|
||||
}()
|
||||
}
|
||||
func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ch.Qos(1, 0, false)
|
||||
|
||||
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(q.Name,
|
||||
"",
|
||||
false,
|
||||
false, false, false, nil)
|
||||
if err != nil {
|
||||
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
|
||||
for d := range msgs {
|
||||
logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body)))
|
||||
t := time.Duration(len(d.Body))
|
||||
time.Sleep(t * time.Second)
|
||||
logger.Info("Received msg: Done")
|
||||
d.Ack(false)
|
||||
}
|
||||
}()
|
||||
}
|
Reference in New Issue
Block a user