发布者会将一条消息发布到交换点,后者会根据一定的路由规则或登记了相应消费者的绑定信息来讲消息的副本发到队列(或分发到多个队列)里去。对此,Quora 上的这个回答有个很好的解释。
与消息传递有关的代码
由于我们想要使用新的以及现有的代码来从我们现有的 account service 和新的 vip service 里面的 Spring Cloud 配置文件里面加载我们所需的配置,所以我们会在这里创建我们的第一个共享库。
首先在 /goblog 下创建新文件夹 common 来存放可重用的内容:
mkdir -p common/messaging
mkdir -p common/config
我们将所有与 AMQP 相关的代码放在 messaging 文件夹里面,并将配置文件放在 config 文件夹里。你也可以将 /goblog/accountservice/config 的内容复制到 /goblog/common/config 中 - 请记住,这要求我们更新先前从 account service 中导入配置代码的 import 语句。不妨看看完整源代码来查阅这部分的写法。
跟消息传递有关的代码会被封装在一个文件中,该文件将定义应用程序用来连接,发布和订阅消息的接口还有实际实现。实际上,我们用的 streadway / amqp 已经提供了很多实现 AMQP 消息传递所需的模板代码,所以这部分的具体细节也便不深究了。
在 /goblog/common/messaging 中创建一个新的 .go 文件:messagingclient.go。
让我们看看里面主要应有什么:
// 定义用来连接、发布消息、消费消息的接口
type IMessagingClient interface {
ConnectToBroker(connectionString string)
Publish(msg []byte, exchangeName string, exchangeType string) error
PublishOnQueue(msg []byte, queueName string) error
Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
Close()
}
上面这段代码定义了我们所用的消息接口。这就是我们的 “account service” 和 “vip service” 在消息传递时将要处理的问题,能通过抽象手段来消除系统的大部分复杂度。请注意,我选择了两个 “Produce” 和 “Consume” 的变体,以便与订阅/发布主题还有 direct / queue 消息传递模式合在一起使用。
接下来,我们将定义一个结构体,该结构体将持有指向 amqp.Connection
的指针,并且我们将再加上一些必要的方法,以便它能(隐式地,Go 一直以来都是这样)实现我们刚才声明的接口。
// 接口实现,封装了一个指向 amqp.Connection 的指针
type MessagingClient struct {
conn *amqp.Connection
}
接口的实现非常冗长,在此只给出其中两个 - ConnectToBroker()
和 PublishToQueue()
:
func (m *MessagingClient) ConnectToBroker(connectionString string) {
if connectionString == "" {
panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?")
}
var err error
m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString))
if err != nil {
panic("Failed to connect to AMQP compatible broker at: " + connectionString)
}
}
这就是我们获得 connection 指针 (如 amqp.Dial
) 的方法。如果我们丢掉了配置文件,或者连接不上中继器,那么微服务就会抛出一个 panic 异常,并会让容器协调器重新创建一个新的实例。在这里传入的 connectionString 参数就如下例所示:
amqp://guest:guest@rabbitmq:5672/
注意,这里的 rabbitmq broker 是以 service 这一 Docker Swarm 的模式下运行的。
PublishOnQueue()
函数很长 - 它跟官方提供的 streadway 样例或多或少地有些不同,毕竟这里简化了它的一些参数。为了将消息发布到一个有名字的队列,我们需要传递这些参数:
- body - 以字节数组的形式存在。可以是 JSON,XML 或一些二进制文件。
- queueName - 要发送消息的队列的名称。
若要了解交换器的更多详情,那请参阅 RabbitMQ 文档。
func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error {
if m.conn == nil {
panic("Tried to send message before connection was initialized. Don't do that.")
}
ch, err := m.conn.Channel() // 从 connection 里获得一个 channel 对象
defer ch.Close()
// 提供一些参数声明一个队列,若相应的队列不存在,那就创建一个
queue, err := ch.QueueDeclare(
queueName, // 队列名
false, // 是否持久存在
false, // 是否在不用时就删掉
false, // 是否排外
false, // 是否无等待
nil, // 其他参数
)
// 往队列发布消息
err = ch.Publish(
"", // 目标为默认的交换器
queue.Name, // 路由关键字,例如队列名
false, // 必须发布
false, // 立即发布
amqp.Publishing{
ContentType: "application/json",
Body: body, // JSON 正文, 以 byte[] 形式给出
})
fmt.Printf("A message was sent to queue %v: %v", queueName, body)
return err
}
这里的模板代码略多,但应该不难理解。这段代码会声明一个(如果不存在那就创建一个)队列,然后把我们的消息以字节数组的形式发布给它。
将消息发布到一个有名字的交换器的代码会更复杂,因为它需要一段模板代码来声明交换器,以及队列,并把它们绑定在一起。这里有一份完整的源代码示例。
接下来,由于我们的 “MessageClient” 的实际使用者会是 /goblog/accountservice/service/handlers.go ,我们会往里面再添加一个字段,并在请求的帐户有 ID “10000” 的时候往硬编码进程序中的 “is VIP” 检查方法中发送一条消息:
var DBClient dbclient.IBoltClient
var MessagingClient messaging.IMessagingClient // NEW
func GetAccount(w http.ResponseWriter, r *http.Request) {
...
然后:
...
notifyVIP(account) // 并行地发送 VIP 消息
// 若有这样的 account, 那就把它弄成一个 JSON, 然后附上首部和其他内容来打包
data, _ := json.Marshal(account)
writeJsonResponse(w, http.StatusOK, data)
}
// 如果这个 account 是我们硬编码进来的 account, 那就开个协程来发送消息
func notifyVIP(account model.Account) {
if account.Id == "10000" {
go func(account model.Account) {
vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()}
data, _ := json.Marshal(vipNotification)
err := MessagingClient.PublishOnQueue(data, "vipQueue")
if err != nil {
fmt.Println(err.Error())
}
}(account)
}
}
正好借此机会展示一下调用一个新的协程(goroutine)时所使用的内联匿名函数,即使用 go
关键字。我们不能因为要执行 HTTP 处理程序发送消息就把 “主” 协程阻塞起来,因此这也是增加一点并行性的好时机。
main.go 也需要有所更新,以便在启动的时候能使用加载并注入到 Viper 里面的配置信息来初始化 AMQ 连接。
// 在 main 方法里面调用这个函数
func initializeMessaging() {
if !viper.IsSet("amqp_server_url") {
panic("No 'amqp_server_url' set in configuration, cannot start")
}
service.MessagingClient = &messaging.MessagingClient{}
service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
}
这段没什么意思 - 我们通过创建一个空的消息传递结构,并使用从 Viper 获取的属性值来调用 ConnectToBroker
来得到 service.MessagingClient
实例。如果我们的配置没有 broker_url
,那就抛一个 panic 异常,毕竟在不可能连接到中继器的时候程序也没办法运行。
更新配置
我们在第 8 部分中已将 amqp_broker_url
属性添加到了我们的 .yml 配置文件里面,所以这个步骤实际上已经做过了。
broker_url: amqp://guest:guest@192.168.99.100:5672 _(dev)_
broker_url: amqp://guest:guest@rabbitmq:5672 _(test)_
注意,我们在 “test” 配置文件里面填入的是 Swarm 服务名 “rabbitmq”,而不是从我的电脑上看到的 Swarm 的 LAN IP 地址。(大家的实际 IP 地址应该会有所不同,不过运行 Docker Toolbox 时 192.168.99.100 似乎是标准配置)。
我们并不推荐在配置文件中填入用户名和密码的明文。在真实的使用环境中,我们通常可以使用在第 8 部分中看到的 Spring Cloud Config 服务器里面的内置加密功能。
单元测试
当然,我们至少应该编写一个单元测试,以确保 handlers.go 中的 GetAccount
函数在有人请求由 “10000” 标识的非常特殊的帐户时会尝试去发送一条消息。
为此,我们需要在 handlers_test.go 中实现一个模拟的 IMessagingClient
还有一个新的测试用例。我们先从模拟开始。这里我们将使用第三方工具 mockery 生成一个 IMessagingClient
接口的模拟实现(在 shell 运行下面的命令的时候一定要先把 GOPATH 设置好):
> go get github.com/vektra/mockery/.../
> cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging
> ./$GOPATH/bin/mockery -all -output .
Generating mock for: IMessagingClient
现在,在当前文件夹中就有了一个模拟实现文件 IMessagingClient.go。我看这个文件的名字不爽,也看不惯它的驼峰命名法,因此我们将它重命名一下,让它的名字能更明显的表示它是一个模拟的实现,并且遵循本系列博客的文件名的一贯风格:
mv IMessagingClient.go mockmessagingclient.go
我们可能需要在生成的文件中稍微调整一下 import 语句,并删除一些别名。除此之外,我们会对这个模拟实现采用一种黑盒方法 - 只假设它会在我们开始测试的时候起作用。
不妨也看一看这里生成的模拟实现的源码,这跟我们在第 4 章中手动编写的内容非常相似。
在 handlers_test.go 里添加一个新的测试用例:
// 声明一个模仿类来让测试更有可读性
var anyString = mock.AnythingOfType("string")
var anyByteArray = mock.AnythingOfType("[]uint8") // == []byte
func TestNotificationIsSentForVIPAccount(t *testing.T) {
// 配置 DBClient 的模拟实现
mockRepo.On("QueryAccount", "10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil)
DBClient = mockRepo
mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil)
MessagingClient = mockMessagingClient
Convey("Given a HTTP req for a VIP account", t, func() {
req := httptest.NewRequest("GET", "/accounts/10000", nil)
resp := httptest.NewRecorder()
Convey("When the request is handled by the Router", func() {
NewRouter().ServeHTTP(resp, req)
Convey("Then the response should be a 200 and the MessageClient should have been invoked", func() {
So(resp.Code, ShouldEqual, 200)
time.Sleep(time.Millisecond * 10) // Sleep since the Assert below occurs in goroutine
So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue)
})
})})
}
有关的详情都写在了注释里。在此,我也看不惯在断言 numberOfCalls 的后置状态之前人为地搞个 10 ms 的睡眠,但由于模拟是在与 “主线程” 分离的协程中调用的,我们需要让它稍微挂起一段时间等待主线程完成一些工作。在此也希望能对协程和管道(channel)有一个更好的惯用的单元测试方式。
我承认 - 使用这种测试方式的过程比在为 Java 应用程序编写单元测试用例时使用 Mockito 更加冗长。不过,我还是认为它的可读性不错,写起来也很简单。
接着运行测试,并确保测试通过:
go test ./...
运行
首先要运行 springcloud.sh 脚本来更新配置服务器。然后运行 copyall.sh 并等待几秒钟,来让它完成对我们的 “account service” 的更新。然后我们再使用 curl 来获取我们的 “特殊” 帐户。
> curl http://$ManagerIP:6767/accounts/10000
{"id":"10000","name":"Person_0","servedBy":"10.255.0.11"}
若顺利的话,我们应该能够打开 RabbitMQ 的管理控制台。然后再看看我们是否在名为 vipQueue 的队列上收到了一条消息:
open http://192.168.99.100:15672/#/queues