package EventBus
import "sync"
// 定义数据结构
type DataEvent struct {
Data interface{}
Topic string
}
// DataChannel是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent
// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice []DataChannel
// 定义事件总线 EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
Subscribers map[string]DataChannelSlice
rm sync.RWMutex
}
// 发布主题 发布者需要提供广播给订阅者所需要的主题和数据
func (eb *EventBus) Publish(topic string, data interface{}) {
eb.rm.RLock()
if chans, found := eb.Subscribers[topic]; found {
// 这样做是因为切片引用相同的数组,即使它们是按值传递的
// 因此我们正在使用我们的元素创建一个新切片,从而正确地保持锁定
channels := append(DataChannelSlice{}, chans...) //切片赋值
//使用Goroutine 来避免阻塞发布者
go func(data DataEvent, dataChannelSlices DataChannelSlice) {
for _, ch := range dataChannelSlices {
ch <- data
}
}(DataEvent{Data: data, Topic: topic}, channels)
}
eb.rm.RUnlock()
}
// 订阅主题 如传统方法回调一样。当发布者向主题发布数据时,channel将接收数据。
func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
eb.rm.Lock()
if prev, found := eb.Subscribers[topic]; found {
eb.Subscribers[topic] = append(prev, ch)
} else {
eb.Subscribers[topic] = append([]DataChannel{}, ch)
}
eb.rm.Unlock()
}
共有 0 - Golang—实现简单的事件总线(发布订阅模式)