简单的生产者/消费者实现

This commit is contained in:
Shikong 2022-11-20 23:02:01 +08:00
parent 79d84638cb
commit 8099800977
5 changed files with 191 additions and 3 deletions

96
consumer_test.go Normal file
View File

@ -0,0 +1,96 @@
package main
import (
"github.com/goccy/go-json"
"github.com/streadway/amqp"
"log"
"skcks.cn/study/rabbitmq/exchange"
"skcks.cn/study/rabbitmq/queue"
"testing"
"time"
)
func TestConsumer(t *testing.T) {
conn, err := amqp.Dial(ServerURL)
defer func(conn *amqp.Connection) {
_ = conn.Close()
}(conn)
if err != nil {
log.Fatalln(err)
}
log.Println("连接成功")
channel, err := conn.Channel()
defer func(channel *amqp.Channel) {
err := channel.Close()
if err != nil {
}
}(channel)
errors := make([]error, 0)
// 声明交换机
err = exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName)
errors = append(errors, exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName))
queue1 := "direct_queue1"
queue2 := "direct_queue2"
route1 := "teacher"
route2 := "student"
// 声明队列
errors = append(errors, queue.DeclareDirect(channel, queue1))
errors = append(errors, queue.DeclareDirect(channel, queue2))
// 声明交换机与队列绑定及路由关系
errors = append(errors, channel.QueueBind(queue1, route1, exchange.DefaultDirectExchangeName, false, nil))
errors = append(errors, channel.QueueBind(queue2, route2, exchange.DefaultDirectExchangeName, false, nil))
if len(errors) > 0 {
for _, err := range errors {
if err != nil {
log.Fatalln(err)
}
}
}
messages1, err := channel.Consume(
queue1,
"consumer1",
true,
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
messages2, err := channel.Consume(
queue2,
"consumer2",
true,
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
forever := make(chan bool)
go func() {
for d := range messages1 {
var msg = &Message{}
_ = json.Unmarshal(d.Body, msg)
t.Logf("接收 %s 消息: %s %s", queue1, d.Body, time.Unix(-1, msg.NanoTime).Format("2006-01-02 15:04:05.000000000"))
}
}()
go func() {
for d := range messages2 {
var msg = &Message{}
_ = json.Unmarshal(d.Body, msg)
t.Logf("接收 %s 消息: %s %s", queue2, d.Body, time.Unix(-1, msg.NanoTime).Format("2006-01-02 15:04:05.000000000"))
}
}()
log.Printf("Waiting for messages. To exit press CTRL+C")
<-forever
}

5
go.mod
View File

@ -2,4 +2,7 @@ module skcks.cn/study/rabbitmq
go 1.19
require github.com/streadway/amqp v1.0.0 // indirect
require (
github.com/goccy/go-json v0.9.11 // indirect
github.com/streadway/amqp v1.0.0 // indirect
)

2
go.sum
View File

@ -1,2 +1,4 @@
github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=

59
main.go
View File

@ -4,11 +4,20 @@ import (
"github.com/streadway/amqp"
"log"
"skcks.cn/study/rabbitmq/exchange"
"skcks.cn/study/rabbitmq/publish"
"skcks.cn/study/rabbitmq/queue"
"time"
)
const ServerURL = "amqp://shikong:12341234@10.10.10.100:15672"
type Message struct {
Time int64
MilliTime int64
MicroTime int64
NanoTime int64
}
func main() {
conn, err := amqp.Dial(ServerURL)
defer func(conn *amqp.Connection) {
@ -22,12 +31,56 @@ func main() {
log.Println("连接成功")
channel, err := conn.Channel()
defer func(channel *amqp.Channel) {
err := channel.Close()
if err != nil {
}
}(channel)
errors := make([]error, 0)
// 声明交换机
err = exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName)
errors = append(errors, exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName))
errors = append(errors, queue.DeclareDirect(channel, "direct_queue1"))
errors = append(errors, queue.DeclareDirect(channel, "direct_queue2"))
queue1 := "direct_queue1"
queue2 := "direct_queue2"
route1 := "teacher"
route2 := "student"
// 声明队列
errors = append(errors, queue.DeclareDirect(channel, queue1))
errors = append(errors, queue.DeclareDirect(channel, queue2))
// 声明交换机与队列绑定及路由关系
errors = append(errors, channel.QueueBind(queue1, route1, exchange.DefaultDirectExchangeName, false, nil))
errors = append(errors, channel.QueueBind(queue2, route2, exchange.DefaultDirectExchangeName, false, nil))
ch := make(chan bool)
go func() {
for i := 0; i < 10; i++ {
for j := 0; j < 1000_0000; j++ {
_ = publish.Send(channel,
exchange.DefaultDirectExchangeName, route1, false, false, &Message{
Time: time.Now().Unix(),
MilliTime: time.Now().UnixMilli(),
MicroTime: time.Now().UnixMicro(),
NanoTime: time.Now().UnixNano(),
},
)
_ = publish.Send(channel,
exchange.DefaultDirectExchangeName, route2, false, false, &Message{
Time: time.Now().Unix(),
MilliTime: time.Now().UnixMilli(),
MicroTime: time.Now().UnixMicro(),
NanoTime: time.Now().UnixNano(),
},
)
}
}
ch <- true
}()
if len(errors) > 0 {
for _, err := range errors {
if err != nil {
@ -35,4 +88,6 @@ func main() {
}
}
}
<-ch
}

32
publish/publish.go Normal file
View File

@ -0,0 +1,32 @@
package publish
import (
"github.com/goccy/go-json"
"github.com/streadway/amqp"
"log"
)
func Send(channel *amqp.Channel, exchange, route string, mandatory, immediate bool, data interface{}) error {
bytes, err := json.Marshal(data)
if err != nil {
return err
}
if err = channel.Publish(
exchange, // exchange
route, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: bytes,
//Expiration: "60000", // 消息过期时间
},
); err != nil {
log.Println("Failed to publish a message:", err.Error())
return err
}
return nil
}