Thingsboard ThingsBoard 二次开发之源码分析 4-启动分析 3

sds · 2020年09月29日 · 354 次阅读
本帖已被设为精华帖!

thingsboard 聚集地

Thingsboard 话题讨论区:https://www.iotschool.com/topics/node8

欢迎大家加入 thingsboard 二次开发讨论群:121202538

thingsboard交流QQ群 121202538

#ThingsBoard 源码分析 4-启动分析 3

以下的分析环境基于默认配置

1. Actor 模型

首先搬用官网关于 Actor 模型的介绍:

actor架构图

  • The brief description of each actor’s functionality is listed below:

  • App Actor - responsible for management of tenant actors(负责管理租户 Actor). An instance of this actor is always present in memory.

  • Tenant Actor - responsible for management of tenant device & rule chain actors(负责管理租户设备,规则链 Actor). An instance of this actor is always present in memory.

  • Device Actor - maintain state of the device: active sessions, subscriptions, pending RPC commands(处理设备状态,激活 session,订阅,RPC 请求), etc. Caches current device attributes in memory for performance reasons. An actor is created when the first message from the device is processed. The actor is stopped when there is no messages from devices for a certain time.

  • Rule Chain Actor - process incoming messages and dispatches them to rule node actors(处理消息,并将消息分发给 node Actor). An instance of this actor is always present in memory.

  • Rule Node Actor - process incoming messages, and report results back to rule chain actor(处理消息,并将处理的消息返回给规则链 Actor). An instance of this actor is always present in memory.

2. Actor 启动分析

DefaultActorService简析

图片看不清楚的,请点击这里:

DefaultActorService@PostConstruct标记的initActorSystem()开启了 actor 的启动流程:

public void initActorSystem() {
    log.info("Initializing actor system.");
    actorContext.setActorService(this);
    TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
    system = new DefaultTbActorSystem(settings);
    //创建不同actor执行的线程池
    system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
    system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
    system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
    system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

    actorContext.setActorSystem(system);
    //创建AppActor详细分析见①createActor分析
    appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
    actorContext.setAppActor(appActor);
    //创建statsActor,暂时不关注
    TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
    actorContext.setStatsActor(statsActor);

    log.info("Actor system initialized.");
}

DefaultTbActorSystem类中方法createActor分析

private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {
    //获取initActorSystem时对应的线程池
    Dispatcher dispatcher = dispatchers.get(dispatcherId);
    if (dispatcher == null) {
        log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
        throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");
    }
    //creator有不同的实现如AppActor,TenantActor,RuleChainActor, RuleNodeActor等
    // AppActor, 则使用固定的NULL_UUID,UUID.fromString("13814000-1dd2-11b2-8080-808080808080")作为Id
    // TenantActor 使用creator提供的tenantId作为Id
    // RuleChainActor 使用creator提供的ruleChain作为Id
    // RuleNodeActor  使用creator提供的ruleNodeId作为Id
    TbActorId actorId = creator.createActorId();
    //缓存使用,将已经创建的放入缓存
    TbActorMailbox actorMailbox = actors.get(actorId);
    if (actorMailbox != null) {
        log.debug("Actor with id [{}] is already registered!", actorId);
    } else {
        Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
        actorCreationLock.lock();
        try {
            actorMailbox = actors.get(actorId);
            if (actorMailbox == null) {
                log.debug("Creating actor with id [{}]!", actorId);
                //正式创建Actor,也就是调用构造函数
                TbActor actor = creator.createActor();
                //只允许AppActor的parent是空
                TbActorRef parentRef = null;
                if (parent != null) {
                    parentRef = getActor(parent);
                    if (parentRef == null) {
                        throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");
                    }
                }
                //使用actor创建TbActorMailbox对象
                TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);
                //放入缓存
                actors.put(actorId, mailbox);
                //用各自的线程池去进行初始化操作,具体的初始化操作由每种actor的init方法实现详细分析如下②③④
                mailbox.initActor();
                actorMailbox = mailbox;
                if (parent != null) {
                    //缓存,父子关系缓存
                    parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);
                }
            } else {
                log.debug("Actor with id [{}] is already registered!", actorId);
            }
        } finally {
            actorCreationLock.unlock();
            actorCreationLocks.remove(actorId);
        }
    }
    return actorMailbox;
}
  • TenantActorinit方法:

如果此应用不是以某一个 tennant 启动的话或者isolatedTbRuleEngine是 false,调用父类RuleChainManagerActorinitRuleChains()方法查询该租户下的rule_chain表。开始创建RuleChainActor, 创建RuleChainActor的 parent 是当前TenantActor,创建的时候,又会调用到DefaultTbActorSystemcreateActor方法;

  • RuleChainActor实现了ComponentActorComponentActorinit方法调用了createProcessor(TbActorCtx ctx)(由子类实现,创建了RuleChainActorMessageProcessor实例)和initProcessor(TbActorCtx ctx)(调用processor.start也即RuleChainActorMessageProcessorstart方法,该方法查询查询relation表,条件为 RULE_CHAIN 和 ruleChainId,再根据 to_id 去rule_node表获取RuleNode;1. 根据 ruleNodeList 开始初创建RuleNodeActor2.initRoutes()查询relation,rule_noderule_chain表,构造 nodeRoutes 的数据,并设置 firstRuleNode);

  • RuleNodeActor实现了ComponentActorComponentActorinit方法调用了createProcessor(TbActorCtx ctx)(由子类实现,创建了RuleNodeActorMessageProcessor实例)和initProcessor(TbActorCtx ctx),(调用processor.start也即RuleNodeActorMessageProcessorstart方法,使用反射创建 ruleNode 的实例,并按照数据库 configuration 的配置,调用init方法);

DefaultActorService@EventListener(ApplicationReadyEvent.class)标记的onApplicationEvent开始进一步初始化流程:

调用appActor.tellWithHighPriority(new AppInitMsg());AppInitMsg推入AppActor的消息队列里,然后尝试处理这一消息,最终会交给AppActordoProcess方法进行处理,由于还未ruleChainsInitialized从未进行复制,此时 AppActor 开始初始化 TennatActor,查询tenant表对所有 tenant 创建TenantActor;

总结

  1. initActorSystem创建AppActorStatsActor两个 Actor,并未创建其他的 Actor;

  2. 在收到ApplicationReadyEvent的时候,由于之前没有初始化,所以会初始化该应用的所有TenantActor, 根据②③④点分析,循环切递归的创建了所有的RuleChainActor, RuleNodeActor;

  3. 关于TbActorMailbox的理解:mailbox 理解为一个信箱,里面有一些信件 (即入队列的消息),这些信件有一些是高优先级,有些是普通优先级的,每次取信件的时候,都先看有没高优先级的,先处理高优先级的信件,再处理普通优先级的信件。那么信件处理人是谁呢?信件的处理人就是每一个 Actor.实际上的信件处理方法都是每一个 Actor 的doProcess(TbActorMsg msg)方法

  4. 我们讨论的每一个RuleNodeActor就是在前端 RULE CHAINS 界面看到的每一个节点。

sds 将本帖设为了精华贴 09月29日 14:40
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册