diff --git a/consumer_test.go b/consumer_test.go new file mode 100644 index 0000000..21e99c4 --- /dev/null +++ b/consumer_test.go @@ -0,0 +1,96 @@ +package main + +import ( + "github.com/goccy/go-json" + "github.com/streadway/amqp" + "log" + "skcks.cn/study/rabbitmq/exchange" + "skcks.cn/study/rabbitmq/queue" + "testing" + "time" +) + +func TestConsumer(t *testing.T) { + conn, err := amqp.Dial(ServerURL) + defer func(conn *amqp.Connection) { + _ = conn.Close() + }(conn) + + if err != nil { + log.Fatalln(err) + } + + log.Println("连接成功") + + channel, err := conn.Channel() + defer func(channel *amqp.Channel) { + err := channel.Close() + if err != nil { + + } + }(channel) + + errors := make([]error, 0) + // 声明交换机 + err = exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName) + errors = append(errors, exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName)) + + queue1 := "direct_queue1" + queue2 := "direct_queue2" + route1 := "teacher" + route2 := "student" + // 声明队列 + errors = append(errors, queue.DeclareDirect(channel, queue1)) + errors = append(errors, queue.DeclareDirect(channel, queue2)) + + // 声明交换机与队列绑定及路由关系 + errors = append(errors, channel.QueueBind(queue1, route1, exchange.DefaultDirectExchangeName, false, nil)) + errors = append(errors, channel.QueueBind(queue2, route2, exchange.DefaultDirectExchangeName, false, nil)) + + if len(errors) > 0 { + for _, err := range errors { + if err != nil { + log.Fatalln(err) + } + } + } + + messages1, err := channel.Consume( + queue1, + "consumer1", + true, + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + messages2, err := channel.Consume( + queue2, + "consumer2", + true, + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + forever := make(chan bool) + go func() { + for d := range messages1 { + var msg = &Message{} + _ = json.Unmarshal(d.Body, msg) + t.Logf("接收 %s 消息: %s %s", queue1, d.Body, time.Unix(-1, msg.NanoTime).Format("2006-01-02 15:04:05.000000000")) + } + }() + go func() { + for d := range messages2 { + var msg = &Message{} + _ = json.Unmarshal(d.Body, msg) + t.Logf("接收 %s 消息: %s %s", queue2, d.Body, time.Unix(-1, msg.NanoTime).Format("2006-01-02 15:04:05.000000000")) + } + }() + + log.Printf("Waiting for messages. To exit press CTRL+C") + <-forever +} diff --git a/go.mod b/go.mod index 4f144fd..532fb45 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module skcks.cn/study/rabbitmq go 1.19 -require github.com/streadway/amqp v1.0.0 // indirect +require ( + github.com/goccy/go-json v0.9.11 // indirect + github.com/streadway/amqp v1.0.0 // indirect +) diff --git a/go.sum b/go.sum index 75f2157..18a8b5a 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ +github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/main.go b/main.go index 16ac210..fa61de1 100644 --- a/main.go +++ b/main.go @@ -4,11 +4,20 @@ import ( "github.com/streadway/amqp" "log" "skcks.cn/study/rabbitmq/exchange" + "skcks.cn/study/rabbitmq/publish" "skcks.cn/study/rabbitmq/queue" + "time" ) const ServerURL = "amqp://shikong:12341234@10.10.10.100:15672" +type Message struct { + Time int64 + MilliTime int64 + MicroTime int64 + NanoTime int64 +} + func main() { conn, err := amqp.Dial(ServerURL) defer func(conn *amqp.Connection) { @@ -22,12 +31,56 @@ func main() { log.Println("连接成功") channel, err := conn.Channel() + defer func(channel *amqp.Channel) { + err := channel.Close() + if err != nil { + + } + }(channel) errors := make([]error, 0) + // 声明交换机 err = exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName) errors = append(errors, exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName)) - errors = append(errors, queue.DeclareDirect(channel, "direct_queue1")) - errors = append(errors, queue.DeclareDirect(channel, "direct_queue2")) + + queue1 := "direct_queue1" + queue2 := "direct_queue2" + route1 := "teacher" + route2 := "student" + // 声明队列 + errors = append(errors, queue.DeclareDirect(channel, queue1)) + errors = append(errors, queue.DeclareDirect(channel, queue2)) + + // 声明交换机与队列绑定及路由关系 + errors = append(errors, channel.QueueBind(queue1, route1, exchange.DefaultDirectExchangeName, false, nil)) + errors = append(errors, channel.QueueBind(queue2, route2, exchange.DefaultDirectExchangeName, false, nil)) + + ch := make(chan bool) + go func() { + for i := 0; i < 10; i++ { + for j := 0; j < 1000_0000; j++ { + _ = publish.Send(channel, + exchange.DefaultDirectExchangeName, route1, false, false, &Message{ + Time: time.Now().Unix(), + MilliTime: time.Now().UnixMilli(), + MicroTime: time.Now().UnixMicro(), + NanoTime: time.Now().UnixNano(), + }, + ) + _ = publish.Send(channel, + exchange.DefaultDirectExchangeName, route2, false, false, &Message{ + Time: time.Now().Unix(), + MilliTime: time.Now().UnixMilli(), + MicroTime: time.Now().UnixMicro(), + NanoTime: time.Now().UnixNano(), + }, + ) + } + } + + ch <- true + }() + if len(errors) > 0 { for _, err := range errors { if err != nil { @@ -35,4 +88,6 @@ func main() { } } } + + <-ch } diff --git a/publish/publish.go b/publish/publish.go new file mode 100644 index 0000000..2884fd6 --- /dev/null +++ b/publish/publish.go @@ -0,0 +1,32 @@ +package publish + +import ( + "github.com/goccy/go-json" + "github.com/streadway/amqp" + "log" +) + +func Send(channel *amqp.Channel, exchange, route string, mandatory, immediate bool, data interface{}) error { + bytes, err := json.Marshal(data) + if err != nil { + return err + } + + if err = channel.Publish( + exchange, // exchange + route, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + Headers: amqp.Table{}, + ContentType: "text/plain", + ContentEncoding: "", + Body: bytes, + //Expiration: "60000", // 消息过期时间 + }, + ); err != nil { + log.Println("Failed to publish a message:", err.Error()) + return err + } + return nil +}