From 79d84638cb7cb9c5efe541064aabfa89e0fd91b0 Mon Sep 17 00:00:00 2001 From: Shikong <919411476@qq.com> Date: Sun, 20 Nov 2022 16:37:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E4=BB=93=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/.gitignore | 8 ++++ .idea/go-rabbitmq-study.iml | 9 ++++ .idea/modules.xml | 8 ++++ .idea/vcs.xml | 6 +++ exchange/direct.go | 38 +++++++++++++++++ go.mod | 5 +++ go.sum | 2 + main.go | 38 +++++++++++++++++ queue/queue.go | 33 +++++++++++++++ types/options.go | 84 +++++++++++++++++++++++++++++++++++++ 10 files changed, 231 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/go-rabbitmq-study.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 exchange/direct.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 queue/queue.go create mode 100644 types/options.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/go-rabbitmq-study.iml b/.idea/go-rabbitmq-study.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/go-rabbitmq-study.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..3185882 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/exchange/direct.go b/exchange/direct.go new file mode 100644 index 0000000..4fb8205 --- /dev/null +++ b/exchange/direct.go @@ -0,0 +1,38 @@ +package exchange + +import ( + "github.com/streadway/amqp" + "log" + "skcks.cn/study/rabbitmq/types" +) + +const DefaultDirectExchangeName = "direct_exchange" + +func DeclareDirect(channel *amqp.Channel, name string, options ...types.DeclareOptionFunc) error { + defaultOption := &types.DeclareOption{ + Name: name, + Kind: "direct", + Durable: true, + AutoDelete: false, //auto-deleted + Internal: false, //internal + NoWait: false, //noWait + Args: nil, + } + + option := types.MergeExchangeOptionsWithDefault(defaultOption, options) + + // 声明 exchange + if err := channel.ExchangeDeclare( + option.Name, //name + option.Kind, //exchangeType + option.Durable, //durable + option.AutoDelete, //auto-deleted + option.Internal, //internal + option.NoWait, //noWait + option.Args, // , //arguments + ); err != nil { + log.Println("Failed to declare a exchange:", err.Error()) + return err + } + return nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4f144fd --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module skcks.cn/study/rabbitmq + +go 1.19 + +require github.com/streadway/amqp v1.0.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..75f2157 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/main.go b/main.go new file mode 100644 index 0000000..16ac210 --- /dev/null +++ b/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "github.com/streadway/amqp" + "log" + "skcks.cn/study/rabbitmq/exchange" + "skcks.cn/study/rabbitmq/queue" +) + +const ServerURL = "amqp://shikong:12341234@10.10.10.100:15672" + +func main() { + conn, err := amqp.Dial(ServerURL) + defer func(conn *amqp.Connection) { + _ = conn.Close() + }(conn) + + if err != nil { + log.Fatalln(err) + } + + log.Println("连接成功") + + channel, err := conn.Channel() + + errors := make([]error, 0) + err = exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName) + errors = append(errors, exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName)) + errors = append(errors, queue.DeclareDirect(channel, "direct_queue1")) + errors = append(errors, queue.DeclareDirect(channel, "direct_queue2")) + if len(errors) > 0 { + for _, err := range errors { + if err != nil { + log.Fatalln(err) + } + } + } +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..ddf2f19 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,33 @@ +package queue + +import ( + "github.com/streadway/amqp" + "log" + "skcks.cn/study/rabbitmq/types" +) + +func DeclareDirect(channel *amqp.Channel, name string, options ...types.DeclareOptionFunc) error { + defaultOption := &types.DeclareOption{ + Name: name, + Durable: true, + AutoDelete: false, //auto-deleted + Internal: false, //internal + NoWait: false, //noWait + Args: nil, + } + + option := types.MergeExchangeOptionsWithDefault(defaultOption, options) + // 声明一个queue + if _, err := channel.QueueDeclare( + option.Name, // name + option.Durable, // durable + option.AutoDelete, // delete when unused + option.Exclusive, // exclusive + option.NoWait, // no-wait + option.Args, // arguments + ); err != nil { + log.Println("Failed to declare a queue:", err.Error()) + return err + } + return nil +} diff --git a/types/options.go b/types/options.go new file mode 100644 index 0000000..1e5349e --- /dev/null +++ b/types/options.go @@ -0,0 +1,84 @@ +package types + +import "github.com/streadway/amqp" + +type DeclareOptions = []DeclareOptionFunc + +type DeclareOption struct { + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Exclusive bool + Args amqp.Table +} + +type DeclareOptionFunc func(option *DeclareOption) + +func NewExchangeOption(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) *DeclareOption { + return &DeclareOption{Name: name, Kind: kind, Durable: durable, AutoDelete: autoDelete, Internal: internal, NoWait: noWait, Args: args} +} + +func Name(name string) DeclareOptionFunc { + return func(option *DeclareOption) { + option.Name = name + } +} + +func ExchangeDirectKind() DeclareOptionFunc { + return func(option *DeclareOption) { + option.Kind = "direct" + } +} + +func Durable(val bool) DeclareOptionFunc { + return func(option *DeclareOption) { + option.Durable = val + } +} + +func AutoDelete(val bool) DeclareOptionFunc { + return func(option *DeclareOption) { + option.AutoDelete = val + } +} + +func Internal(val bool) DeclareOptionFunc { + return func(option *DeclareOption) { + option.Internal = val + } +} + +func NoWait(val bool) DeclareOptionFunc { + return func(option *DeclareOption) { + option.NoWait = val + } +} + +func Args(args amqp.Table) DeclareOptionFunc { + return func(option *DeclareOption) { + option.Args = args + } +} + +func QueueExclusive(val bool) DeclareOptionFunc { + return func(option *DeclareOption) { + option.Exclusive = val + } +} + +func MergeExchangeOptions(options DeclareOptions) *DeclareOption { + defaultOption := &DeclareOption{} + return MergeExchangeOptionsWithDefault(defaultOption, options) +} + +func MergeExchangeOptionsWithDefault(defaultOption *DeclareOption, options DeclareOptions) *DeclareOption { + option := defaultOption + + for _, exchangeOption := range options { + exchangeOption(option) + } + return option +}