JGroups 2.8 源码阅读

2019年为处理某故障,对 JGroups 做了粗略的代码解读。目前尚有系统仍在使用 JGroups,故找出文档,分享在此。

EhCache 底层调度 JGroups 代码分析

1. JGroups 协议栈初始化与启动时序图

启动EhCache的JGroupManager

new NotificationBus(String, String)
->NotificationBus.start()
->new JChannel(String)
->this JChannel(ProtocolStackConfigurator)
->JChannel.init(ProtocolStackConfigurator)
->ProtocolStack.setup()
->Configurator.createProtocols(Vector<ProtocolConfiguration>, ProtocolStack)
->Configurator.ProtocolConfiguration.createLayer(ProtocolStack)
->Protocol.setPropertiesInternal(Properties)
->Protocol.setProperties(Properties) //此处为列表迭代调用抽象类实现,不同协议执行各自方法
->Configurator.initProtocolStack(List<Protocol>)
->Protocol.init() //此处为列表迭代调用抽象类实现,不同协议执行各自方法
->JChannel.connect(String)
->JChannel.startStack(String)
->ProtocolStack.startStack(String)
->Configurator.startProtocolStack(List<Protocol>, String, Map<String, Tuple<TP, Short>>)
->Protocol.start() //此处为列表迭代调用抽象类实现,不同协议执行各自方法
->JChannel.downcall(Event) //发出Event.CONNECT
->ProtocolStack.down(Event)
->Protocol.down(Event) //此处此处为列表迭代调用抽象类实现,不同协议执行各自方法

至此,启动完毕

服务端

sequenceDiagram
    participant App
    participant NotificationBus
    participant JChannel
    participant ProtocolStack
    participant Configurator
    participant Protocol as Protocol Layer

    App->>NotificationBus: new NotificationBus()
    NotificationBus->>JChannel: new JChannel()
    JChannel->>ProtocolStack: setup()
    ProtocolStack->>Configurator: createProtocols()
    Configurator->>Protocol: createLayer()
    Protocol->>Protocol: setPropertiesInternal()
    Protocol->>Protocol: setProperties() // 各协议配置自身
    ProtocolStack->>Configurator: initProtocolStack()
    Configurator->>Protocol: init() // 各协议初始化自身

客户端

sequenceDiagram
    participant App
    participant NotificationBus
    participant JChannel
    participant ProtocolStack
    participant Configurator
    participant Protocol as Protocol Layer

    App->>NotificationBus: new NotificationBus()
    NotificationBus->>JChannel: connect()
    JChannel->>ProtocolStack: startStack()
    ProtocolStack->>Configurator: startProtocolStack()
    Configurator->>Protocol: start() // 各协议启动自身
    JChannel->>ProtocolStack: downcall(Event.CONNECT)
    ProtocolStack->>Protocol: down(Event) // 事件向下传递

2. TCP 协议初始化与启动时序图

TCP协议

setProperties()
->createInitialHosts(String) //创建初始化列表,根据配置的节点及端口范围形成完整初始节点

init()
->BasicTCP.init() //在起始端口start_port小于等于0时检查是否使用了动态侦测协议MPING or TCPGOSSIP,没用就报错
->TP.init() //构建线程工厂类,默认线程工厂类/带外数据线程工厂类/定时器线程工厂类

start()
->getConnectionTable(long, long, InetAddress, InetAddress, int, int, PortsManager)
->new ConnectionTable(Receiver, InetAddress, InetAddress, int, int, PortsManager)
->ConnectionTable.init()
->ConnectionTable.createServerSocket(int, int) //端口是在这里绑定的,端口的自增也是在这里发生的
->BasicTCP.start()
->ConnectionTable.start()
->Thread.start() //Runnable实现是ConnectionTable.run()。该实现用以接受连入的连接,并指派处理者。处理者是交另一线程处理的。
->Thread.start() //连接收割者,无用连接会被remove。Runnable实现是BasicConnectionTable.Reaper.run()。每隔1分钟执行一次,去除空闲时间超5分钟的连接。
->BasicConnectionTable.start()

setProperties

sequenceDiagram
    participant TCP

    TCP->>TCP: createInitialHosts()

init

sequenceDiagram
    participant TCP
    participant BasicTCP
    participant TP

    TCP->>BasicTCP: BasicTCP.init()
    TCP->>TP: TP.init() // 创建线程工厂

start

sequenceDiagram
    participant TCP
    participant ConnectionTable
    participant Thread as Thread Pool
    
    TCP->>ConnectionTable: getConnectionTable()
    ConnectionTable->>ConnectionTable: init()
    ConnectionTable->>ConnectionTable: createServerSocket() // 绑定端口
    TCP->>ConnectionTable: start()
    ConnectionTable->>Thread: start() // ConnectionTable.run()
    ConnectionTable->>Thread: start() // Reaper.run()

3. TCPPING 协议发现成员时序图

TCPPING协议

init()
->timer //创建定时器,用以查找其他机器

响应Event.FIND_INITIAL_MBRS
down(Event)
->Discovery.findInitialMembers(Promise<JoinRsp>)
->Discovery.PingSenderTask.start() //交给定时器线程去处理
->Discovery.Responses.get(long) //本线程循环执行,最多等待timeout,该时间受配置影响。退出条件是1.得到足够的初始化节点 or 2.设置了num_initial_srv_members,将忽略num_initial_members,达到数量退出 or 3.得到协调者响应,直接退出 or 4.超时
->Discovery.PingSenderTask.stop()

init

sequenceDiagram
    participant TCPPING

    TCPPING->>TCPPING: timer // 创建定时器

down(Event)

sequenceDiagram
    participant TCPPING
    participant Discovery
    participant Timer as Timer Thread

    TCPPING->>Discovery: findInitialMembers()
    Discovery->>Timer: PingSenderTask.start()
    Discovery->>Timer: Responses.get(timeout)
    loop 等待响应
        Timer->>Timer: response
    end
    Note over Timer: 等待条件: 1.足够节点<br/>2.num_initial_srv_members<br/>3.协调者响应<br/>4.超时
    Discovery->>Timer: PingSenderTask.stop()

4. FD 与 VERIFY_SUSPECT 协议协作时序图

FD协议,每次view变化时,会触发监视器,超过两个节点,就通过FD向邻居发送心跳包,没有收到心跳包,会向上触发事件Event.SUSPECT

VERIFY_SUSPECT协议,这个协议需要在FD之上,GMS之下
当调用BasicTCP.sendToSingleMember(Address, byte[], int, int)出现异常时,会向上触发事件Event.SUSPECT

响应Event.SUSPECT
up(Event)
->VERIFY_SUSPECT.verifySuspect(Address) //发送ARE_YOU_DEAD消息,交给定时器线程处理,本线程等待,结果是返回或超时
->VERIFY_SUSPECT.startTimer() //用以接受回报,对确实SUSPECT的节点,继续通知上层协议,一般是由JChannel交Receiver处理应用自己关心的内容
->Protocol.down(Event) //组织消息,下送ARE_YOU_DEAD消息

up(Event)

sequenceDiagram
    participant FD as FD Protocol
    participant VS as VERIFY_SUSPECT
    participant Timer as Timer Thread
    participant LowerLayer as Lower Protocol

    par 定期检测
        FD->>FD: 定期发送心跳
    and 异常检测
        FD->>FD: 检测到心跳丢失
        FD->>VS: up(Event.SUSPECT)
    end

    VS->>VS: verifySuspect(Address)
    VS->>Timer: startTimer() // 等待回应
    VS->>LowerLayer: down(Event) // 发送ARE_YOU_DEAD
    alt 收到回应
        Timer-->>VS: 收到回应
        VS->>VS: 取消怀疑
    else 超时未回应
        Timer-->>VS: 超时
        VS->>UpperLayer: up(Event.SUSPECT) // 确认怀疑
    end

5. GMS 协议处理连接时序图

GMS协议

new GMS()
->GMS.initState()
->GMS.becomeClient() //默认先将自己作为客户端
->ClientGmsImpl.init()

响应Event.CONNECT
down(Event)
->Protocol.down(Event) //此处此处为列表迭代调用抽象类实现,不同协议执行各自方法
->ClientGmsImpl.join(Address)
->ClientGmsImpl.join(Address, boolean)
->ClientGmsImpl.findInitialMembers(Promise<JoinRsp>) //发出Event.FIND_INITIAL_MBRS,查找初始化节点消息
->根据返回结果,分支-->ClientGmsImpl.becomeSingletonMember(Address) //发出Event.BECOME_SERVER,将自己升级为Server
\
->ClientGmsImpl.determineCoord(List<PingRsp>) //查找协调者
->ClientGmsImpl.sendJoinMessage(Address, Address, boolean) //向协调者发出,GMS.GmsHeader.JOIN_REQ
sequenceDiagram
    participant GMS as GMS Protocol
    participant ClientImpl as ClientGmsImpl
    participant LowerLayer as Lower Protocol

    GMS->>GMS: new GMS()
    GMS->>GMS: initState()
    GMS->>GMS: becomeClient()
    GMS->>ClientImpl: init()
    
    GMS->>LowerLayer: down(Event.CONNECT)
    GMS->>ClientImpl: join(Address)
    ClientImpl->>ClientImpl: findInitialMembers()
    ClientImpl->>LowerLayer: down(Event.FIND_INITIAL_MBRS)
    
    alt 找到初始成员
        ClientImpl->>ClientImpl: determineCoord()
        ClientImpl->>ClientImpl: sendJoinMessage()
    else 未找到初始成员
        ClientImpl->>ClientImpl: becomeSingletonMember()
        ClientImpl->>GMS: up(Event.BECOME_SERVER)
    end

## EhCache 底层调度 JGroups 的配置解读

connect=TCP(start_port=7800;end_port=7800;bind_addr=192.168.20.118)://通讯底层协议
//start_port: find first available port starting at this port.起始端口
//end_port: maximum port to bind to.结束端口
//bind_addr: The interface (NIC) which should be used by this transport.绑定地址
TCPPING(initial_hosts=192.168.20.118[7800];port_range=1;timeout=5000;num_initial_members=1):
//initial_hosts: hosts to be contacted for the initial membership.初始节点
//port_range: number of ports to be probed for initial membership.初始端口范围
//timeout: 侦测节点等待时间
//num_initial_members: 需要的初始化节点数目
MERGE2(max_interval=60000;min_interval=20000)://发现其他节点协议
//min_interval: minimum time between executions of the FindSubgroups task
//max_interval: maximum time between executions of the FindSubgroups task,取interval是随机在两个值之间的值
FD(timeout=5000;shun=false)://失败侦测协议
//timeout: number of millisecs to wait for an are-you-alive msg,等待are-you-alive的时间
//shun: 回避,建议关闭,采用merge2协议来处理
VERIFY_SUSPECT(timeout=1500)://确认嫌疑协议
//timeout: number of millisecs to wait for an are-you-dead msg,等待are-you-dead的时间
pbcast.NAKACK(gc_lag=20;retransmit_timeout=3000)://消息确认协议
//gc_lag: number of msgs garbage collection lags behind. 消息垃圾收集深度,建议不要太大,在消息非常大时,容易出现OOM
//retransmit_timeout: time(s) to wait before requesting retransmission. 请求重传等待时间。
UNICAST(timeout=300,600,1200)://单播协议
//timeout: for AckSenderWindow: max time to wait for missing acks.动态超时时间。
pbcast.STABLE(desired_avg_gossip=20000)://广播稳定协议
//desired_avg_gossip: Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages. 平均20s发内部消息,设置为0来关闭
pbcast.GMS(join_timeout=5000;shun=false;print_local_addr=false)//成员管理协议
//join_timeout: 发送join消息等待时间
//shun: 回避,建议关闭,采用merge2协议来处理
//print_local_addr: 启动是否输出本机信息

配置建议:

  1. 在所有需要的节点上,均将 TCPstart_portend_port 设为一个值,且每个节点端口唯一。用以保证不会端口跳号。
  2. TCPPINGinitial_hosts 中,将所有节点均进行静态配置。只有静态配置完整了,MERGE2 才能正常工作。
  3. TCPPING 的超时 timeout,在查找初始节点时被请求数num_ping_requests 进行了等分,用以执行多次。默认num_ping_requests 是2。将 timeout 值设置得稍大点,能确保节点的找到。
  4. 关闭 FDGMSshun 选项。在采用了 MERGE2 协议后,不必配置,用 merge 代替 shun
  5. NAKACKgc_lag 需要细心设置,根据包的大小合理估计。默认值是 20
  6. 为防止节点因迟缓而被踢出,可以适当延长 FD 协议中 timeout 值。
  7. 许多操作是通过 timer 进行处理的。默认打开3个 timer。需要通过 -Djgroups.timer.num_threads 来设置。

某故障分析

  1. 重启节点后为什么无法加入到原集群组中

    根据连接 158:7801 的日志,说明其未单独建立集群,正在向当前集群协调者申请加入。
    根据代码、配置及日志,分析如下:

    1. 前次启动 158 时,由于某些特殊原因,比如 7800 处于 time_wait 状态,导致 158 占用了 7801 端口
    2. 158159 在出问题前,是一个完整集群,158:7801 是作为协调者的
    3. 158 出问题后,FD 协议侦测出了 158:7801 有问题,但由于 FD 协议被设置到了 VERIFY_SUSPECT 协议上层,没法交给 VERIFY_SUSPECT 协议去确认嫌疑。159 中协调者仍然保持为 158:7801,未将自己升级为协调者。
    4. 158 重启后,向 159 要来了集群信息,得到 158:7801 是协调者这一信息。于是向 158:7801 申请加入。
    5. 由于 158:7801 实际已消亡,所以,无法加入。158:7800 一直处于待加入集群态。
  2. 为什么说 158 单独建立了集群?

    如问题一答复,根据连接 158:7801 的日志,说明其未单独建立集群,正在向当前集群协调者申请加入。

  3. jgroups 集群成员丢失的判断依据

    1. 基于协议 FD/VERIFY_SUSPECTFD 协议需要在 VERIFY_SUSPECT 协议下层。
    2. FD 发送心跳包 are-you-alive,检查邻居存活情况。一旦侦测到失败,就向上发出 Event.SUSPECT
    3. VERIFY_SUSPECT 处理 FD 检查到心跳失败时向上发出的 Event.SUSPECT。会向该节点发出 are-you-dead,如果在超时时间内没返回,确认怀疑。进一步向上发出 Event.SUSPECT
    4. 但某系统配置有误,将 FD 协议放到了 VERIFY_SUSPECT 的上层,导致异常侦测存在问题。

某故障分析2

问题现象

某系统报日志大量滚动刷新输出,经确认,为 jgroups 日志输出

问题分析

前置知识背景

  1. jgroups 是基于 TCPPING 进行的集群发现,TCPPING 是基于 TCPPING 协议,用于发现集群中的节点,并返回节点的 IP 地址和 port,用于后续的 TCP 连接。
  2. TCPPING 协议需配置 initial_hosts 参数,用于指定集群中节点的 IP 地址和 port,用于后续的 TCP 连接。
  3. 配置在 initial_hosts 参数中的节点,被认为是集群中的 leader 节点,或者说初始节点。
  4. 如果只有一个节点,那么这个节点就是初始节点,否则,第一个节点作为初始节点,后续节点作为后续节点。

本次问题分析

  1. 使用了 name_masked_framework 框架,对 jgroups 进行了封装。
  2. name_masked_framework 的配置里,bindHosts 中配置的节点信息将被传递到 jgroupsTCPPING 协议中,作为 initial_hosts 信息。
  3. 根据现场得到信息,xxx_appyyy_app 的节点都配置了 bindHosts 参数,但 bindHosts 参数中只配置了一个节点,即本机:192.168.10.111:12335
  4. 这造成了一个严重问题,即在 xxx_appyyy_app 中,jgroups 集群中,只有第一个启动的进程,才会被认定为 leader(初始节点),另一个只能作为后续节点。
  5. 本次,通过分析,可发现 xxx_app 进程为初始节点,yyy_app 为后续节点。

推演问题过程

  1. xxx_app 进程停机,此时集群中只剩下一个进程,即 yyy_app 进程。该进程在集群中是后续节点,不具备维持 jgroups 集群能力。
  2. xxx_app 进程重启,此时 xxx_app 进程重新监听 12335 端口成为初始节点,构建 jgroups 集群。
  3. yyy_app 进程通过 TCPPING 探测到 12335 端口,尝试加入到 jgroups 集群中。但由于xxx_app 进程启动过程中未接受到任何组网包,不认为 yyy_app 进程是自己集群内成员,拒绝其发送的包。

复现

已通过最小代码集构建测试包,复现此问题。

配置解决方案

yyy_app 版本的配置与 xxx_app 版本的配置分离,分别配置,不公用配置即可

两种改法,1 隔离改法

# xxx_app
bindHost=192.168.10.111
bindPort=12335
bindHosts=192.168.10.111[12335]

# yyy_app
bindHost=192.168.10.111
bindPort=13335
bindHosts=192.168.10.111[13335]

两种改法,2 全部配置为初始节点

# xxx_app
bindHost=192.168.10.111
bindPort=12335
bindHosts=192.168.10.111[12335],192.168.10.111[12336]

# yyy_app
bindHost=192.168.10.111
bindPort=12335
bindHosts=192.168.10.111[12335],192.168.10.111[12336]