事件流与事件溯源

2024年 2月 1日 62.5k 0

事件流和事件溯源是事件驱动架构中两个相关但不同的概念。

事件流是持续捕获和存储系统中发生的事件的过程。这些事件可以实时处理和分析,也可以存储以供后续分析。事件流通常用于需要实时处理大量数据的系统,如金融交易系统或社交媒体平台。

以下是使用流行的Kafka消息系统在Go中进行事件流处理的简单示例:

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
)

func main() {
    // 设置Kafka生产者以将事件发送到主题
    writer := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "my-topic",
    })

    // 发送一些事件到主题
    writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("key1"),
            Value: []byte("value1"),
        },
        kafka.Message{
            Key:   []byte("key2"),
            Value: []byte("value2"),
        },
    )

    // 设置Kafka消费者以从主题读取事件
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "my-topic",
    })

    // 从主题读取事件
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            break
        }
        fmt.Printf("Received message: key=%s, value=%sn", string(msg.Key), string(msg.Value))
    }
}

而事件溯源是一种构建系统的模式,将应用程序状态的所有变化存储为事件序列。这些事件然后可以用于在任何时间点重建应用程序的状态。事件溯源通常用于需要可审计性、可追溯性或合规性的系统,如金融系统或医疗系统。

以下是在Go中使用内存事件存储进行事件溯源的简单示例:


package main

import (
    "fmt"
)

type Event struct {
    Type string
    Data interface{}
}

type EventStore struct {
    events []Event
}

func (store *EventStore) Append(event Event) {
    store.events = append(store.events, event)
}

func (store *EventStore) GetEvents() []Event {
    return store.events
}

type Account struct {
    id      string
    balance int
    store   *EventStore
}

func NewAccount(id string, store *EventStore) *Account {
    return &Account{
        id:      id,
        balance: 0,
        store:   store,
    }
}

func (account *Account) Deposit(amount int) {
    event := Event{
        Type: "deposit",
        Data: amount,
    }
    account.store.Append(event)
    account.balance += amount
}

func (account *Account) Withdraw(amount int) {
    if account.balance >= amount {
        event := Event{
            Type: "withdraw",
            Data: amount,
        }
        account.store.Append(event)
        account.balance -= amount
    }
}

func (account *Account) GetBalance() int {
    return account.balance
}

func main() {
    store := &EventStore{}
    account := NewAccount("123", store)

    account.Deposit(100)
    account.Withdraw(50)
    account.Deposit(25)

    events := store.GetEvents()
    for _, event := range events {
        switch event.Type {
        case "deposit":
            amount := event.Data.(int)
            fmt.Printf("Deposited %dn", amount)
        case "withdraw":
            amount := event.Data.(int)
            fmt.Printf("Withdrew %dn", amount)
        }
    }

    fmt.Printf("Final balance: %dn", account.GetBalance())
}

事件溯源是通过将每个对聚合的修改记录为事件并将其追加到连续流中的一种方法。要重建聚合的最终状态,需要按顺序读取这些事件,然后将其应用于聚合。这与在创建、读取、更新和删除(CRUD)系统中执行的即时修改形成对比。在CRUD系统中,对记录状态的任何更改都存储在数据库中,实质上覆盖了同

一聚合的先前版本。

一旦价格变化已保存到Products表中,只更新了价格本身,而行的其余部分保持不变。然而,如图5.1所示,这种方法导致了先前价格和更改背后的上下文的丢失。

为了保留不仅新价格还包括关键元数据(如调整原因)的信息,将更改记录为Events表中的事件。先前的价格在先前事件中保持不变,以便在需要时检索。

为了实现有效的事件溯源,建议使用提供强大一致性保证并使用乐观并发控制的事件存储。在实践中,这意味着当多个修改同时发生时,只有初始修改才能附加到流中。随后的修改可能需要重试或可能会失败。

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论