Java面试高频题:手写一个支持订阅发布的轻量级数据总线

一、数据总线设计核心原则

在分布式系统开发中,事件驱动架构是解耦服务的关键技术。设计数据总线时需遵循三大核心原则:

  1. 解耦原则:生产者与消费者通过事件通道通信,无需直接引用对方
  2. 异步非阻塞:采用观察者模式实现事件发布与订阅的异步处理
  3. 线程安全:多线程环境下保证事件通知的原子性和可见性

典型应用场景包括:

  • 微服务间的消息传递
  • UI事件处理系统
  • 实时数据监控平台
  • 分布式任务调度

二、基础架构设计

1. 核心接口定义

  1. public interface EventBus {
  2. // 注册事件监听器
  3. void register(String eventType, EventListener listener);
  4. // 注销事件监听器
  5. void unregister(String eventType, EventListener listener);
  6. // 发布事件
  7. void post(Event event);
  8. }
  9. public interface EventListener<T extends Event> {
  10. void onEvent(T event);
  11. }

2. 事件对象设计

  1. public abstract class Event {
  2. private final String eventType;
  3. private final long timestamp;
  4. private final Map<String, Object> attributes = new HashMap<>();
  5. protected Event(String eventType) {
  6. this.eventType = eventType;
  7. this.timestamp = System.currentTimeMillis();
  8. }
  9. // Getter方法及属性操作方法...
  10. }

三、核心实现方案

1. 同步事件总线实现

  1. public class SyncEventBus implements EventBus {
  2. private final ConcurrentHashMap<String, CopyOnWriteArrayList<EventListener>> registry =
  3. new ConcurrentHashMap<>();
  4. @Override
  5. public void register(String eventType, EventListener listener) {
  6. registry.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
  7. .add(listener);
  8. }
  9. @Override
  10. public void post(Event event) {
  11. List<EventListener> listeners = registry.getOrDefault(event.getEventType(), Collections.emptyList());
  12. for (EventListener listener : listeners) {
  13. try {
  14. listener.onEvent(event);
  15. } catch (Exception e) {
  16. // 异常处理逻辑
  17. }
  18. }
  19. }
  20. }

关键点解析

  • 使用ConcurrentHashMap保证线程安全的注册表
  • CopyOnWriteArrayList实现写时复制,避免并发修改异常
  • 同步调用保证事件处理顺序

2. 异步事件总线实现

  1. public class AsyncEventBus implements EventBus {
  2. private final Executor executor;
  3. private final EventBus syncBus;
  4. public AsyncEventBus(Executor executor) {
  5. this.executor = executor;
  6. this.syncBus = new SyncEventBus();
  7. }
  8. @Override
  9. public void register(String eventType, EventListener listener) {
  10. syncBus.register(eventType, listener);
  11. }
  12. @Override
  13. public void post(Event event) {
  14. executor.execute(() -> syncBus.post(event));
  15. }
  16. }

线程池配置建议

  • 核心线程数:CPU核心数 * 2
  • 最大线程数:根据系统负载动态调整
  • 任务队列:使用有界队列防止内存溢出
  • 拒绝策略:推荐使用CallerRunsPolicy

四、高级特性扩展

1. 事件过滤机制

  1. public interface FilterableEventBus extends EventBus {
  2. void register(String eventType, EventListener listener, Predicate<Event> filter);
  3. }
  4. public class FilteredEventBus implements FilterableEventBus {
  5. private final Map<String, List<FilterEntry>> registry = new ConcurrentHashMap<>();
  6. @Override
  7. public void post(Event event) {
  8. List<FilterEntry> entries = registry.getOrDefault(event.getEventType(), Collections.emptyList());
  9. entries.stream()
  10. .filter(e -> e.filter.test(event))
  11. .forEach(e -> executor.execute(() -> e.listener.onEvent(event)));
  12. }
  13. private static class FilterEntry {
  14. final EventListener listener;
  15. final Predicate<Event> filter;
  16. FilterEntry(EventListener listener, Predicate<Event> filter) {
  17. this.listener = listener;
  18. this.filter = filter;
  19. }
  20. }
  21. }

2. 事件序列化支持

  1. public interface SerializableEvent extends Event {
  2. byte[] serialize() throws IOException;
  3. static SerializableEvent deserialize(byte[] data) throws IOException, ClassNotFoundException {
  4. // 实现反序列化逻辑
  5. }
  6. }
  7. // 使用示例
  8. public class UserCreatedEvent implements SerializableEvent {
  9. private final String userId;
  10. @Override
  11. public byte[] serialize() throws IOException {
  12. try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
  13. ObjectOutputStream oos = new ObjectOutputStream(bos)) {
  14. oos.writeObject(this);
  15. return bos.toByteArray();
  16. }
  17. }
  18. }

3. 死信队列处理

  1. public class DeadLetterEventBus implements EventBus {
  2. private final EventBus primaryBus;
  3. private final EventBus deadLetterBus;
  4. private final int maxRetries;
  5. public DeadLetterEventBus(EventBus primary, EventBus deadLetter, int maxRetries) {
  6. this.primaryBus = primary;
  7. this.deadLetterBus = deadLetter;
  8. this.maxRetries = maxRetries;
  9. }
  10. @Override
  11. public void post(Event event) {
  12. AtomicInteger retryCount = event.getAttribute("retryCount", AtomicInteger.class, new AtomicInteger(0));
  13. try {
  14. primaryBus.post(event);
  15. } catch (Exception e) {
  16. if (retryCount.incrementAndGet() >= maxRetries) {
  17. deadLetterBus.post(event);
  18. } else {
  19. event.setAttribute("retryCount", retryCount);
  20. // 指数退避重试逻辑
  21. }
  22. }
  23. }
  24. }

五、性能优化策略

  1. 批量事件处理:通过BatchEvent封装多个事件,减少系统调用次数
  2. 事件批处理:设置批处理大小和等待时间阈值

    1. public class BatchEvent<T extends Event> {
    2. private final List<T> events = new ArrayList<>();
    3. private final long maxWaitMillis;
    4. private final int maxSize;
    5. public synchronized void add(T event) {
    6. events.add(event);
    7. if (events.size() >= maxSize ||
    8. (maxWaitMillis > 0 && System.currentTimeMillis() - startTime >= maxWaitMillis)) {
    9. flush();
    10. }
    11. }
    12. }
  3. 内存管理

    • 使用对象池技术复用事件对象
    • 实现弱引用监听器注册表
  4. 监控指标
    • 事件处理吞吐量
    • 平均处理延迟
    • 失败事件率

六、典型应用场景

1. 微服务通信

  1. // 服务A - 订单创建
  2. EventBus bus = new AsyncEventBus(Executors.newFixedThreadPool(4));
  3. bus.post(new OrderCreatedEvent(orderId));
  4. // 服务B - 库存处理
  5. bus.register(OrderCreatedEvent.class, event -> {
  6. inventoryService.reserve(event.getOrderId());
  7. });

2. UI事件处理

  1. // 视图层
  2. eventBus.register("user.click", event -> {
  3. analyticsService.trackClick((MouseEvent)event);
  4. });
  5. // 控制器层
  6. button.addActionListener(e -> {
  7. eventBus.post(new MouseEvent("user.click", e.getPoint()));
  8. });

3. 实时数据处理

  1. // 数据采集
  2. eventBus.post(new SensorReadingEvent(sensorId, value));
  3. // 实时计算
  4. eventBus.register(SensorReadingEvent.class, event -> {
  5. if (event.getValue() > threshold) {
  6. alertService.trigger(event.getSensorId());
  7. }
  8. });

七、面试常见问题解析

  1. 如何保证事件顺序性?

    • 单线程事件总线
    • 分区事件总线(相同key的事件路由到固定线程)
  2. 如何处理事件循环依赖?

    • 检测算法(拓扑排序)
    • 限制递归深度
    • 异步化处理
  3. 如何实现事件溯源?

    • 结合事件存储(Event Sourcing)
    • 快照机制加速恢复
  4. 跨进程事件总线方案

    • 基于消息队列(如通用消息队列服务)
    • gRPC流式通信
    • WebSocket全双工通信

八、最佳实践建议

  1. 事件设计规范

    • 使用现在时态命名(如UserCreated而非UserWasCreated
    • 包含事件版本号(@Version注解)
    • 避免包含业务逻辑
  2. 错误处理策略

    • 实现重试机制(指数退避)
    • 记录失败事件到持久化存储
    • 提供补偿事务接口
  3. 测试方案

    • 单元测试:Mock EventBus验证监听器调用
    • 集成测试:真实事件总线测试
    • 压力测试:模拟高并发事件场景

通过本文的系统讲解,读者可以全面掌握事件驱动架构的核心实现技术。从基础接口设计到高级特性扩展,从性能优化到异常处理,每个环节都包含可落地的代码实现和工程建议。这种设计模式不仅适用于面试场景,更是构建高并发、可扩展系统的关键技术方案。