Skip to content

Latest commit

 

History

History
317 lines (242 loc) · 11.6 KB

规则引擎.md

File metadata and controls

317 lines (242 loc) · 11.6 KB

环境准备

  • release-3.2分支源码

描述

规则引擎是Thingsboard的核心部分,实时数据经过规则引擎进行处理并送至目的地。规则引擎的核心是Actor模型,在2.x版本中,Actor模型使用Akka实现,在3.x中官方自行实现。

开始分析之前,有必要先了解下Actor模型,维基百科的定义如下:

演员模型推崇的哲学是“一切皆是演员”,这与面向对象编程的“一切皆是对象”类似。 演员是一个运算实体,响应接收到的消息,相互间是并发的:

  • 发送有限数量的消息给其他演员;
  • 创建有限数量的新演员;
  • 指定接收到下一个消息时要用到的行为。 Actor Model

分析

Actor模型实现

系统中与Actor模型相关类都在工程common/actor下,几个核心类说明如下:

  • TbActorSystem Actor系统接口,Actor系统类实现该接口
  • TbActor Actor接口,所有Actor需直接或间接实现该接口
  • TbActorCreator Actor创建接口,所有Actor创建器直接或间接实现该接口
  • TbActorRef Actor句柄接口,使用TbActorCreator创建Actor后返回此句柄,通常指向Actor的邮箱。
  • TbActorCtx Actor上下文接口,继承TbActorRef接口。
  • AbstractTbActor 抽象Actor,实现TbActor接口。
  • DefaultTbActorSystem Actor系统,用于Dispatcher的创建删除、Actor的创建查找和Actor之间消息传递等。
  • Dispatcher 调度器,用于调度Actor的创建或消息分发。
  • TbActorMailbox 邮箱,实现TbActorCtx接口,存储消息到队列并使用调度器处理队列中的消息。

类继承关系图如下: 类继承

基于以上类,实现一个Actor代码如下:

public class TbMyActor extends AbstractTbActor {

    public TbMyActor() {
    }
    
    @Override
    public boolean process(TbActorMsg msg) {
        //process some message
        return false;
    }
    /**
     * use this to create Actor
     */
    public static class ActorCreator implements TbActorCreator {

        public ActorCreator() {
        }

        @Override
        public TbActorId createActorId() {
            return new TbEntityActorId(new TenantId(EntityId.NULL_UUID));
        }

        @Override
        public TbActor createActor() {
            return new TbMyActor();
        }
    }

}

类继承关系如下: 类继承

引擎初始化

回到正题,DefaultActorService构造一个使用Actor模型系统的规则引擎,分为两个阶段:

  1. 类初始化,方法:initActorSystem
//DefaultActorService 84
log.info("Initializing actor system.");
actorContext.setActorService(this);
TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
//新建DefaultTbActorSystem对象
system = new DefaultTbActorSystem(settings);
//创建线程池用于后续异步处理消息
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

actorContext.setActorSystem(system);
//创建整个Actor模型的根
appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
actorContext.setAppActor(appActor);
//创建状态Actor,也是一个根,用于统计状态
TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
actorContext.setStatsActor(statsActor);

log.info("Actor system initialized.");
  1. 应用准备完成,方法为onApplicationEvent
//DefaultActorService 120
log.info("Received application ready event. Sending application init message to actor system");
//给顶层AppActor邮箱发送消息AppInitMsg
appActor.tellWithHighPriority(new AppInitMsg());

AppActor收到消息后,在doProcess方法中进行处理

//AppActor 67
if (!ruleChainsInitialized) {
  //初始化多个租户Actors
  initTenantActors();
  ruleChainsInitialized = true;
  if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
    log.warn("Rule Chains initialized by unexpected message: {}", msg);
  }
}

TenantActor在init阶段进行租户下规则链RuleChainActor创建

// TenantActor 88
if (isRuleEngineForCurrentTenant) {
    try {
        if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenantProfile.isIsolatedTbRuleEngine())) {
            if (apiUsageState.isReExecEnabled()) {
                log.info("[{}] Going to init rule chains", tenantId);
                //规则链节点初始化
                initRuleChains();
            } else {
                log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
            }
        } else {
            isRuleEngineForCurrentTenant = false;
        }
    } catch (Exception e) {
        cantFindTenant = true;
    }
}

RuleChainActor在init阶段,创建RuleChainActorMessageProcessor并调用其start,进行规则节点RuleNodeActor的创建

//RuleChainActorMessageProcessor 100
if (!started) {
    RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
    if (ruleChain != null) {
        List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
        log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
        // Creating and starting the actors;
        for (RuleNode ruleNode : ruleNodeList) {
            log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
            //创建规则节点
            TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
            //加入到节点集合中
            nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
        }
        //初始化节点路由
        initRoutes(ruleChain, ruleNodeList);
        started = true;
    }
} else {
    onUpdate(context);
}

RuleNodeActor在init阶段,创建RuleNodeActorMessageProcessor并调用其start,进行TbNode的创建

//RuleNodeActorMessageProcessor 62
//根据类型创建TbNode实例
tbNode = initComponent(ruleNode);
if (tbNode != null) {
    state = ComponentLifecycleState.ACTIVE;
}

当与设备相关消息开始上传时,TenantActor还会初始化DeviceActor

//TenantActor 153
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
    //传递消息给DeviceActor,如果没有则创建
    onToDeviceActorMsg((DeviceAwareMsg) msg, false);
    break;

形成的结构如下:

RuleEngine结构

引擎消息传输

完成规则初始化后,规则引擎接受消息传输,以普通设备上传时序数据并存储为例,规则引擎处理的核心处理流程如下:

//ActorSystemContext 561
//传递消息到AppActor邮箱中
appActor.tell(tbActorMsg);

//AppActor 84
//根据消息类型转换消息为QueueToRuleEngineMsg,调用onQueueToRuleEngineMsg方法
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);

//AppActor 140
//创建或获取租户Actor邮箱,并传递消息
getOrCreateTenantActor(msg.getTenantId()).tell(msg);

//TenantActor 150
//根据消息类型转换消息为QueueToRuleEngineMsg,调用onQueueToRuleEngineMsg方法
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);

//TenantActor 185
//获取根规则链Actor邮箱,并传递消息
getRootChainActor().tell(msg);

//RuleChainActor 55
//根据消息类型转换消息为QueueToRuleEngineMsg,使用处理器RuleChainActorMessageProcessor处理消息
processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);

//RuleChainActorMessageProcessor 215
//如果消息中未指定规则节点,targetCtx为第一个节点邮箱,否则为指定节点邮箱
pushMsgToNode(targetCtx, msg, "");

//RuleChainActorMessageProcessor 338
//新建RuleChainToRuleNodeMsg消息,并向规则节点邮箱发送消息
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));

//RuleNodeActor 60
//根据消息类型转换消息为RuleChainToRuleNodeMsg,调用onRuleChainToRuleNodeMsg处理该消息
onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);

//RuleNodeActor 94 
//使用处理器RuleNodeActorMessageProcessor处理消息
//processor.onRuleChainToRuleNodeMsg(msg);

//RuleNodeActorMessageProcessor 136
//调用规则节点实例处理消息
tbNode.onMsg(msg.getCtx(), msg.getMsg());


//TbDeviceProfileNode -> TbMsgTypeSwitchNode 消息处理流程
//TbDeviceProfileNode 135
//获取或创建设备状态DeviceState,处理消息
deviceState.process(ctx, msg);

//DeviceState 140
//处理遥测数据
//stateChanged = processTelemetry(ctx, msg);

//DeviceState 260
//调用上下文tellSuccess方法处理消息
//ctx.tellSuccess(msg);

//DefaultTbContext 103
//调用tellNext处理关联关系为SUCCESS的消息。
tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);

//DefaultTbContext 121
//新建消息RuleNodeToRuleChainTellNextMsg,并传递给所在规则链Actor邮箱。
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));

//RuleChainActor 58
//根据消息类型转换消息为RuleNodeToRuleChainTellNextMsg,使用处理器RuleChainActorMessageProcessor处理消息
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);

//RuleChainActorMessageProcessor 252
//根据消息来源编号originatorNodeId(一般是上一个节点的Id)和关联类型过滤关联联系
List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
                    .filter(r -> contains(relationTypes, r.getType()))
                    .collect(Collectors.toList());

//RuleChainActorMessageProcessor 282
//如果关联关系为1,调用pushToTarget到关联目标实体(一般是下一个节点,也有可能是下一个规则链)
pushToTarget(tpi, msg, relation.getOut(), relation.getType());

//RuleChainActorMessageProcessor 304
//获取关联目标实体的Actor邮箱,调用pushMsgToNode方法处理消息
pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);

//RuleChainActorMessageProcessor 338
//新建消息RuleChainToRuleNodeMsg,向目标实体Actor邮箱发送该消息
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));


//TbMsgTypeSwitchNode 101
//计算relationType(这里是`Post telemetry`),调用上下文tellNext处理消息
ctx.tellNext(msg, relationType);

//TbMsgTypeSwitchNode-> TbMsgTimeseriesNode 消息处理流程类似

核心流程时序图(省掉一些非核心时序)如下:

规则引擎消息传输

为了防止蒙圈,提供一张核心流程示意图:

Actor交互示意图

TIPS

  • Actor模型 wiki
  • 需要注意,在微服务架构下,Core服务和RuleEngine服务都拥有自己的Actor模型实现,Core包含AppActorTenantActor以及DeviceActor,RuleEngine包含AppActorTenantActor 、RuleEngine以及RuleNodeActor