初始化仓库

This commit is contained in:
Shikong 2022-11-20 16:37:13 +08:00
parent 03a3a309ad
commit 79d84638cb
10 changed files with 231 additions and 0 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/go-rabbitmq-study.iml" filepath="$PROJECT_DIR$/.idea/go-rabbitmq-study.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

38
exchange/direct.go Normal file
View File

@ -0,0 +1,38 @@
package exchange
import (
"github.com/streadway/amqp"
"log"
"skcks.cn/study/rabbitmq/types"
)
const DefaultDirectExchangeName = "direct_exchange"
func DeclareDirect(channel *amqp.Channel, name string, options ...types.DeclareOptionFunc) error {
defaultOption := &types.DeclareOption{
Name: name,
Kind: "direct",
Durable: true,
AutoDelete: false, //auto-deleted
Internal: false, //internal
NoWait: false, //noWait
Args: nil,
}
option := types.MergeExchangeOptionsWithDefault(defaultOption, options)
// 声明 exchange
if err := channel.ExchangeDeclare(
option.Name, //name
option.Kind, //exchangeType
option.Durable, //durable
option.AutoDelete, //auto-deleted
option.Internal, //internal
option.NoWait, //noWait
option.Args, // , //arguments
); err != nil {
log.Println("Failed to declare a exchange:", err.Error())
return err
}
return nil
}

5
go.mod Normal file
View File

@ -0,0 +1,5 @@
module skcks.cn/study/rabbitmq
go 1.19
require github.com/streadway/amqp v1.0.0 // indirect

2
go.sum Normal file
View File

@ -0,0 +1,2 @@
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=

38
main.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"github.com/streadway/amqp"
"log"
"skcks.cn/study/rabbitmq/exchange"
"skcks.cn/study/rabbitmq/queue"
)
const ServerURL = "amqp://shikong:12341234@10.10.10.100:15672"
func main() {
conn, err := amqp.Dial(ServerURL)
defer func(conn *amqp.Connection) {
_ = conn.Close()
}(conn)
if err != nil {
log.Fatalln(err)
}
log.Println("连接成功")
channel, err := conn.Channel()
errors := make([]error, 0)
err = exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName)
errors = append(errors, exchange.DeclareDirect(channel, exchange.DefaultDirectExchangeName))
errors = append(errors, queue.DeclareDirect(channel, "direct_queue1"))
errors = append(errors, queue.DeclareDirect(channel, "direct_queue2"))
if len(errors) > 0 {
for _, err := range errors {
if err != nil {
log.Fatalln(err)
}
}
}
}

33
queue/queue.go Normal file
View File

@ -0,0 +1,33 @@
package queue
import (
"github.com/streadway/amqp"
"log"
"skcks.cn/study/rabbitmq/types"
)
func DeclareDirect(channel *amqp.Channel, name string, options ...types.DeclareOptionFunc) error {
defaultOption := &types.DeclareOption{
Name: name,
Durable: true,
AutoDelete: false, //auto-deleted
Internal: false, //internal
NoWait: false, //noWait
Args: nil,
}
option := types.MergeExchangeOptionsWithDefault(defaultOption, options)
// 声明一个queue
if _, err := channel.QueueDeclare(
option.Name, // name
option.Durable, // durable
option.AutoDelete, // delete when unused
option.Exclusive, // exclusive
option.NoWait, // no-wait
option.Args, // arguments
); err != nil {
log.Println("Failed to declare a queue:", err.Error())
return err
}
return nil
}

84
types/options.go Normal file
View File

@ -0,0 +1,84 @@
package types
import "github.com/streadway/amqp"
type DeclareOptions = []DeclareOptionFunc
type DeclareOption struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Exclusive bool
Args amqp.Table
}
type DeclareOptionFunc func(option *DeclareOption)
func NewExchangeOption(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) *DeclareOption {
return &DeclareOption{Name: name, Kind: kind, Durable: durable, AutoDelete: autoDelete, Internal: internal, NoWait: noWait, Args: args}
}
func Name(name string) DeclareOptionFunc {
return func(option *DeclareOption) {
option.Name = name
}
}
func ExchangeDirectKind() DeclareOptionFunc {
return func(option *DeclareOption) {
option.Kind = "direct"
}
}
func Durable(val bool) DeclareOptionFunc {
return func(option *DeclareOption) {
option.Durable = val
}
}
func AutoDelete(val bool) DeclareOptionFunc {
return func(option *DeclareOption) {
option.AutoDelete = val
}
}
func Internal(val bool) DeclareOptionFunc {
return func(option *DeclareOption) {
option.Internal = val
}
}
func NoWait(val bool) DeclareOptionFunc {
return func(option *DeclareOption) {
option.NoWait = val
}
}
func Args(args amqp.Table) DeclareOptionFunc {
return func(option *DeclareOption) {
option.Args = args
}
}
func QueueExclusive(val bool) DeclareOptionFunc {
return func(option *DeclareOption) {
option.Exclusive = val
}
}
func MergeExchangeOptions(options DeclareOptions) *DeclareOption {
defaultOption := &DeclareOption{}
return MergeExchangeOptionsWithDefault(defaultOption, options)
}
func MergeExchangeOptionsWithDefault(defaultOption *DeclareOption, options DeclareOptions) *DeclareOption {
option := defaultOption
for _, exchangeOption := range options {
exchangeOption(option)
}
return option
}