转载

10、Remote Actor

虽然Akka在单机上可以运行上百万的Actor,但出于容错、负载均衡、灰度发布、提高并行度等等原因,我们仍然需要能在多个不同的服务器上运行Actor。所以Akka提供了akka-remoting的扩展包,屏蔽底层网络传输的细节,让上层以及其简单的方式使用远程的Actor调度。
官方文档:https://doc.akka.io/docs/akka/current/remoting.html

适用场景

remoting的存在其实是为akka cluster做底层支持的,通常并不会直接去使用remoting的包。但为了了解cluster的底层原理,还是有必要看下remoting
同时,remoting被设计为Peer-to-Peer(点对点通讯)而非Client-Server,所以不适用于基于后者的系统开发,比如我们无法在一个provider为local的Actor里去查找一个remote actor发送消息,必须两者均为remote actor,才满足对等。

设计

Akka的所有设计,都是考虑了分布式的:所有Actor的交互都是基于事件,所有的操作都是异步的。

基本例子

Akkaremoting完全配置化了,使用时几乎只需要修改配置文件,除非自定义,否则不需要动一行代码。
remoting包提供了两个功能:

  • 查找一个已存在的远程Actor
  • 在指定的远程路径上创建一个远程Actor

添加依赖

在引入akka actor的基本依赖后,再加上remoting的依赖

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-remote_2.12</artifactId>
  <version>2.5.17</version>
</dependency>

配置

在一个Akka项目中启用remote功能的话,最基本需要在application.conf(Akka默认的配置文件名)中启用如下配置:

akka {
  actor {
    provider = remote
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
 }
}

基本配置包含如下四点:

  • providerlocal变成remote
  • enabled-transports指定传输的实现
  • hostname 指定当前Actor底层网络监听组件所需监听的主机名,如果不指定,默认会调用InetAddress.getLocalHost().getHostAddress()来获取当前主机的IP
  • port 指定当前Actor底层网络监听组件所需监听的端口,如果设置为0,则会生成一个随机的端口

由于要测试下本地去寻找远程actor,所以本文的代码例子中,用remote.conf作为配置文件名

注意:如果在同一个主机上启动多个远程Actor,那么port一定要不同。因为远程Actor的底层会启动一个网络监控组件,该组件会去监听指定IP或域名的指定端口。如果都相同,肯定会有一个绑定失败。

查找一个远程Actor

我们创建一个远程Actor,一会儿去查找它。注意,这里加载了remote.conf,但覆盖了端口为2551,目的是在本地模拟一个远端的Actor。如果觉得在本地起不好理解,就可以找一台服务器,把akka.remote.netty.tcp.hostname也覆盖掉换成服务器的IP,或者干脆另起一个配置文件。

public class ToFindRemoteActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void preStart() throws Exception {
        log.info("ToFindRemoteActor is starting");
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, msg->{
                    log.info("Msg received: {}", msg);
                })
                .build();
    }

    public static void main(String[] args) {
        Config config = ConfigFactory.parseString(
                "akka.remote.netty.tcp.port=" + 2551)
                .withFallback(ConfigFactory.load("remote.conf"));

        // Create an Akka system
        ActorSystem system = ActorSystem.create("sys", config);

        // Create an actor
        system.actorOf(Props.create(ToFindRemoteActor.class), "toFind");
    }
}

启动后,可以看到控制台有日志打印出来:

[INFO] [10/26/2018 11:54:33.684] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/26/2018 11:54:34.198] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://sys@127.0.0.1:2551]
[INFO] [10/26/2018 11:54:34.200] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://sys@127.0.0.1:2551]
[INFO] [10/26/2018 11:54:34.363] [sys-akka.actor.default-dispatcher-7] [akka://sys/user/toFind] ToFindRemoteActor is starting

这时,我们先尝试在一个本地进程里去查找这个Actor:

public class Main1 {
    public static void main( String[] args ) {
        ActorSystem system = ActorSystem.create("main1");
        LoggingAdapter log = Logging.getLogger(system, Main2.class);
        ActorSelection toFind = system.actorSelection("akka.tcp://sys@127.0.0.1:2551/user/toFind");
        toFind.tell("hello", ActorRef.noSender());
    }
}

注意,这里我没有提供application.conf,而且也没有指定其他的配置文件!所以这里的ActorSystem起的完全是本地模式。我们运行一下,看看是远端的Actor是否会打印hello呢?

[INFO] [10/26/2018 14:02:27.661] [local-akka.actor.default-dispatcher-2] [akka://local/deadLetters] Message [java.lang.String] without sender to Actor[akka://local/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://local/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

结果给出了这样的日志,说明并没有发送成功。再次验证了上面提到的Akka Remote的Peer-to-Peer设计,必须要求对等,两边都是remote

好了,回到正轨上,我们来看看如何正确的去寻找一个远端actor并发送消息。

public class Main2 {
    public static void main(String[] args) {
        Config config = ConfigFactory.load("remote.conf");
        // Create an Akka system
        ActorSystem system = ActorSystem.create("main2", config);

        // Find remote actor
        ActorSelection toFind = system.actorSelection("akka.tcp://sys@127.0.0.1:2551/user/toFind");
        toFind.tell("hello", ActorRef.noSender());
    }
}

这里加载了remote.conf,启用remote provider。可以在ToFindRemoteActor的控制台有如下日志:

[INFO] [10/26/2018 14:12:11.376] [sys-akka.actor.default-dispatcher-4] [akka://sys/user/toFind] Msg received: hello

说明找到且正常收到了消息。

创建一个远程的Actor

在Main2里,我们相当于起了一个监听着127.0.0.1:2552ActorSystem,那我们把Main2当作远程系统(如果觉得127.0.0.1不太好理解,可以把它打包放到其他服务器,并指定hostname为这个服务器的IP),在当前机器去尝试在Main2这个远端起一个Actor。
远程Actor代码如下:

public class ToCreateRemoteActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void preStart() throws Exception {
        log.info("ToCreateRemoteActor is starting");
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, msg->{
                    log.info("Msg received: {}", msg);
                })
                .build();
    }
}

创建配置文件如下:

akka {
  actor {
    provider = "remote"
    deployment {
      /toCreateActor {
        remote = "akka.tcp://main2@127.0.0.1:2552"
      }
    }
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2553
    }
  }
}

其中toCreateActor就是指定远端要启动的Actor的别名,在本地的ActorSystem靠这个别名去启动。注意指定provider为remote!

public class Main3 {

    public static void main(String[] args) {
        Config config = ConfigFactory.load("create_remote.conf");
        // Create an Akka system
        ActorSystem system = ActorSystem.create("main3", config);
        ActorRef actor = system.actorOf(Props.create(ToCreateRemoteActor.class), "toCreateActor");
        actor.tell("I'm created!", ActorRef.noSender());
    }
}

可以看到这里就尝试去创建一个名字叫toCreateActor的Actor,而这个名字在配置文件中定义了是远端的,Akka会自动尝试去远端创建。
启动一下,看到Main3的日志:

[INFO] [10/26/2018 15:25:42.794] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/26/2018 15:25:43.364] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://main3@127.0.0.1:2553]
[INFO] [10/26/2018 15:25:43.365] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://main3@127.0.0.1:2553]

检查Main2的日志,会发现远程Actor创建的信息:

[INFO] [10/26/2018 15:25:43.774] [main2-akka.actor.default-dispatcher-17] [akka://main2/remote/akka.tcp/main3@127.0.0.1:2553/user/toCreateActor] ToCreateRemoteActor is starting
[INFO] [10/26/2018 15:25:43.775] [main2-akka.actor.default-dispatcher-16] [akka://main2/remote/akka.tcp/main3@127.0.0.1:2553/user/toCreateActor] Msg received: I'm created!

到这,一个远端的Actor就被创建出来了。
不过,事情就这样结束了吗?思考一个问题:查询这种远端创建的Actor,跟之前那个远端自己起来的Actor,方式一样吗?\
参考Main2,我们再写一个Main4来尝试查询并发送消息。那有一个问题,toCreateActor的地址到底该选哪个?按理说,应该是akka.tcp://main2@127.0.0.1:2552/user/toCreateActor。带着问题,我们试试看

public class Main4 {

    public static void main(String[] args) {
        Config config = ConfigFactory.parseString(
                "akka.remote.netty.tcp.port=" + 0)
                .withFallback(ConfigFactory.load("remote.conf"));
        // Create an Akka system
        ActorSystem system = ActorSystem.create("main4", config);

        // Find remote actor
        ActorSelection toFind = system.actorSelection("akka.tcp://main2@127.0.0.1:2552/user/toCreateActor");
        toFind.tell("I'm alive!", ActorRef.noSender());
    }
}

Main2中,会打印

[INFO] [10/26/2018 15:42:25.508] [main2-akka.actor.default-dispatcher-16] [akka://main2/user/toCreateActor] Message [java.lang.String] without sender to Actor[akka://main2/user/toCreateActor] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://main2/user/toCreateActor]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

失败了。。。。。。
仔细看,Main2里面创建出来的Actor的Path是
akka://main2/remote/akka.tcp/main3@127.0.0.1:2553/user/toCreateActor
而远端自己起的Actor地址是:
akka://sys/user/toFind
所以,正确的Path应该是`akka.tcp/main3@127.0.0.1:2553/user/toCreateActor`
修改后测试一下,会发现Main2中打印

[INFO] [10/26/2018 15:25:58.615] [main2-akka.actor.default-dispatcher-17] [akka://main2/remote/akka.tcp/main3@127.0.0.1:2553/user/toCreateActor] Msg received: I'm alive!

所以,可以得出一个看上去不是很合理的结论:
虽然RemoteActor是创建在远程机器上,但如果想要查询它,还得向创建者发请求。

Artery

Artert是Akka为新版的remote包起的代号。目前是共存状态,但被标记为may change状态,仅UDP模式可以用于生产。

配置与原来的remote略有不同

akka {
  actor {
    provider = remote
  }
  remote {
    artery {
      enabled = on
      transport = aeron-udp
      canonical.hostname = "127.0.0.1"
      canonical.port = 25520
    }
  }
}

注意: 一旦启用了artery,所有Actor的地址协议就不用再带上tcp了,比如akka.tcp://127.0.0.1:2551/user/testActor就变成akka://127.0.0.1:2551/user/testActor

与原先相比,多了一个enabled选项控制artery是否启动。
相比原来的remote,Artery的变化主要集中在高吞吐、低延迟场景下提高性能上,包括用Akka Streams TCP/TLS替代了原来的Netty TCP,并新增了基于Aeron的UDP协议模式,以及对直接写java.nio.ByteBuffer的支持,大小消息分channel发等等。

原文地址

http://edisonxu.com/2018/10/30/akka-remote-actor.html

正文到此结束
本文目录