package main import ( "github.com/goccy/go-json" "github.com/streadway/amqp" "log" "skcks.cn/study/rabbitmq/exchange" "skcks.cn/study/rabbitmq/queue" "testing" "time" ) func TestConsumer(t *testing.T) { conn, err := amqp.Dial(ServerURL) defer func(conn *amqp.Connection) { _ = conn.Close() }(conn) if err != nil { log.Fatalln(err) } log.Println("连接成功") channel, err := conn.Channel() defer func(channel *amqp.Channel) { err := channel.Close() if err != nil { } }(channel) errors := make([]error, 0) // 声明交换机 err = exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName) errors = append(errors, exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName)) queue1 := "direct_queue1" queue2 := "direct_queue2" route1 := "teacher" route2 := "student" // 声明队列 errors = append(errors, queue.DeclareDirect(channel, queue1)) errors = append(errors, queue.DeclareDirect(channel, queue2)) // 声明交换机与队列绑定及路由关系 errors = append(errors, channel.QueueBind(queue1, route1, exchange.DefaultDirectExchangeName, false, nil)) errors = append(errors, channel.QueueBind(queue2, route2, exchange.DefaultDirectExchangeName, false, nil)) if len(errors) > 0 { for _, err := range errors { if err != nil { log.Fatalln(err) } } } messages1, err := channel.Consume( queue1, "consumer1", true, false, // exclusive false, // no-local false, // no-wait nil, // args ) messages2, err := channel.Consume( queue2, "consumer2", true, false, // exclusive false, // no-local false, // no-wait nil, // args ) forever := make(chan bool) go func() { for d := range messages1 { var msg = &Message{} _ = json.Unmarshal(d.Body, msg) t.Logf("接收 %s 消息: %s %s", queue1, d.Body, time.Unix(-1, msg.NanoTime).Format("2006-01-02 15:04:05.000000000")) } }() go func() { for d := range messages2 { var msg = &Message{} _ = json.Unmarshal(d.Body, msg) t.Logf("接收 %s 消息: %s %s", queue2, d.Body, time.Unix(-1, msg.NanoTime).Format("2006-01-02 15:04:05.000000000")) } }() log.Printf("Waiting for messages. To exit press CTRL+C") <-forever }