Vert.xはJVM上で動作するイベントドリブンでノンブロッキングな処理をシンプルに記述できるフレームワークです。

アプリケーションはJava以外の言語でも記述できます。JavaScript, Groovy, Ruby, Ceylonで記述できるようですが、パフォーマンスは劣化するみたいです。パフォーマンスを意識する場面での利用が多くなりそうなのでJavaで書くのが正解でしょうか。

概要は公式のドキュメントをつまみ食いして、合わせて参考に記述したものをさらさらと読みました。
Introduction to Vert.xが最も参考になったと思います。

アーキテクチャ

以下のような例を使って説明します。
ServerVerticleでhttpリクエストを受け取ってEventBusを経由してメッセージをWorkerVerticleに送りWorkerVerticleで非同期に処理します。

ネットワークとプロトコル

Nettyを使っているのでNettyと同じレベルの処理が可能です。ここではhttpを受けるserverを作成しています。

Event loop thread

Event loop thread上で動作するVerticleはブロックする処理(例えばJDBC接続)はできません。ブロックする処理を記述するとBlockしているとExeptionがでます。ブロックする処理はWorker pool上にあるWorkerVerticleで処理する必要があります。

デフォルトでは2*CPU個のスレッドが割り当てられます。

例えば以下のように記述します。

ServerVerticle.java
public class ServerVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> fut) {
DeploymentOptions options = new DeploymentOptions().setWorker(true);
vertx.deployVerticle(MessagePublishVerticle.class.getName(),options);
vertx.createHttpServer().requestHandler(req -> {
if(req.method() == HttpMethod.POST){
if(req.headers().get("hogehoge") != null){
req.bodyHandler(body -> {
JsonObject obj = new JsonObject()
.put("User-Agent", req.headers().get("User-Agent"))
.put("Post-Body", body.toString());
vertx.eventBus().send("kafka.pablish.message", obj);
});
}
req.response().end();
} else {
req.response().end("Please send post Method...");
}
}).listen(8080);
}
}

EventBus

各Verticle間の通信を行うための軽量分散型メッセージングシステムです。通信方式としてpublish-subscribe, point-to-point, request-responseの3つのメッセージのやり取りがサポートされています。また、送信されたメッセージをラウンドロビンで転送できたりします。Verticle間の通信の複雑さがこのEventBusでかなり吸収されているので、利用者側はシンプルにVerticle間の通信ができるようになっています。

例えば、以下のようにやり取りします。

vertx.eventBus().send("kafka.pablish.message", "送信するオブジェクト");

vertx.eventBus().consumer("kafka.pablish.message", message -> {
//受け取ったmessageに対する処理
});

1:1でメッセージをやり取りしたいときはlocalConsumerとか使うといいのかな。

Worker Pool

EventLoopで受けたイベント+メッセージを受け取り非同期で処理するためのWockerを利用できます。例えば、EventLoop上でできないBlockする処理を記述することが可能です。デフォルトでは20スレッドが割り当てられます。

例えば以下のように記述します。

WorkerVerticle.java
public class WorkerVerticle extends AbstractVerticle {
KafkaProducer<String, String> producer = null;
@Override
public void start() throws Exception {
Properties props = kafkaConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
vertx.eventBus().consumer("kafka.pablish.message", message -> {
JsonObject obj = (JsonObject) message.body();
producer.send(new ProducerRecord<>("test", obj.toString()));
});
}
@Override
public void stop() throws Exception {
if(producer != null) producer.close();
}
private Properties kafkaConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "10.10.0.5:2181");
props.put("bootstrap.servers", "10.10.0.5:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}

まとめ

簡単に非同期処理ができるのでいい感じです。また、Java8から追加されたラムダ式が使えるのですっきりと記述できます。処理が複雑になるとネストが深くなりそうですが。デフォルトの設定でも十分にパフォーマンスが出るように作られているようです。

終わり。

参考

  1. http://www.slideshare.net/wuman/development-with-vertx-an-eventdriven-application-framework-for-the-jvm
  2. http://acro-engineer.hatenablog.com/entry/2013/08/10/131203
  3. http://uehaj.hatenablog.com/entry/2013/06/03/225117
  4. http://www.slideshare.net/gr8conf/vertx-introduction