SkillAgentSearch skills...

EventStore.JVM

Event Store JVM Client

Install / Use

/learn @kurrent-io/EventStore.JVM
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

[!WARNING] DEPRECATION NOTICE:

  • EventStoreDB version 23.10.x is the last OSS version to support the tcp protocol based client.
  • This project is no longer maintained. We recommend moving to EventStoreDB-Client-Java for ongoing updates and support.

Event Store JVM Client Continuous Integration Coverage Status Version

<table border="0"> <tr> <td><a href="http://www.scala-lang.org">Scala</a> </td> <td>2.13.7 / 2.12.15</td> </tr> <tr> <td><a href="http://akka.io">Akka</a> </td> <td>2.6.17</td> </tr> <tr> <td><a href="https://eventstore.org">Event Store</a></td> <td>v5.x, v20.x, v21.x, v22.x, and v23.x are supported</td> </tr> </table>

Please note that TCP protocol is not supported on EventStoreDB v24.2 and higher. Only the versions listed above are compatible.

We have two APIs available:

  • Calling methods on EsConnection

We are using scala.concurrent.Future for asynchronous calls, however it is not friendly enough for Java users. In order to make Java devs happy and not reinvent a wheel, we propose to use tools invented by Akka team. Check it out

final EsConnection connection = EsConnectionFactory.create(system);
final Future<Event> future    = connection.readEvent("my-stream", new EventNumber.Exact(0), false, null);
val connection = EsConnection(system)
val future     = connection(ReadEvent(EventStream.Id("my-stream"), EventNumber.First))
  • Sending messages to eventstore.ConnectionActor
final ActorRef connection = system.actorOf(ConnectionActor.getProps());
final ReadEvent readEvent = new ReadEventBuilder("my-stream").first().build();
connection.tell(readEvent, null);
val connection = system.actorOf(ConnectionActor.props())
connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

Setup

Sbt

libraryDependencies += "com.geteventstore" %% "eventstore-client" % "7.4.0"

Maven

<dependency>
    <groupId>com.geteventstore</groupId>
    <artifactId>eventstore-client_${scala.version}</artifactId>
    <version>7.4.0</version>
</dependency>

Java examples

Read event

import java.net.InetSocketAddress;
import akka.actor.*;
import akka.actor.Status.Failure;
import akka.event.*;
import eventstore.j.*;
import eventstore.core.*;
import eventstore.akka.Settings;
import eventstore.akka.tcp.ConnectionActor;

public class ReadEventExample {

    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create();
        final Settings settings = new SettingsBuilder()
                .address(new InetSocketAddress("127.0.0.1", 1113))
                .defaultCredentials("admin", "changeit")
                .build();
        final ActorRef connection = system.actorOf(ConnectionActor.getProps(settings));
        final ActorRef readResult = system.actorOf(Props.create(ReadResult.class));

        final ReadEvent readEvent = new ReadEventBuilder("my-stream")
                .first()
                .resolveLinkTos(false)
                .requireMaster(true)
                .build();

        connection.tell(readEvent, readResult);
    }


    public static class ReadResult extends AbstractActor {
        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(ReadEventCompleted.class, m -> {
                        final Event event = m.event();
                        log.info("event: {}", event);
                        context().system().terminate();
                    })
                    .match(Failure.class, f -> {
                        final EsException exception = (EsException) f.cause();
                        log.error(exception, exception.toString());
                        context().system().terminate();
                    })
                    .build();
        }
    }
}

Write event

import java.util.UUID;
import akka.actor.*;
import akka.event.*;
import eventstore.j.*;
import eventstore.core.*;
import eventstore.akka.tcp.ConnectionActor;

public class WriteEventExample {

    public static void main(String[] args) {

        final ActorSystem system   = ActorSystem.create();
        final ActorRef connection  = system.actorOf(ConnectionActor.getProps());
        final ActorRef writeResult = system.actorOf(Props.create(WriteResult.class));

        final EventData event = new EventDataBuilder("my-event")
                .eventId(UUID.randomUUID())
                .data("my event data")
                .metadata("my first event")
                .build();

        final WriteEvents writeEvents = new WriteEventsBuilder("my-stream")
                .addEvent(event)
                .expectAnyVersion()
                .build();

        connection.tell(writeEvents, writeResult);
    }

    public static class WriteResult extends AbstractActor {

        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(WriteEventsCompleted.class, m -> {
                        log.info("range: {}, position: {}", m.numbersRange(), m.position());
                        context().system().terminate();
                    })
                    .match(Status.Failure.class, f -> {
                        final EsException exception = (EsException) f.cause();
                        log.error(exception, exception.toString());
                    })
                    .build();
        }

    }
}

Subscribe to All

import java.io.Closeable;
import akka.actor.ActorSystem;
import eventstore.j.*;
import eventstore.core.IndexedEvent;
import eventstore.akka.SubscriptionObserver;

public class SubscribeToAllExample {
    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create();
        final EsConnection connection = EsConnectionFactory.create(system);
        final Closeable closeable = connection.subscribeToAll(new SubscriptionObserver<IndexedEvent>() {
            @Override
            public void onLiveProcessingStart(Closeable subscription) {
                system.log().info("live processing started");
            }

            @Override
            public void onEvent(IndexedEvent event, Closeable subscription) {
                system.log().info(event.toString());
            }

            @Override
            public void onError(Throwable e) {
                system.log().error(e.toString());
            }

            @Override
            public void onClose() {
                system.log().error("subscription closed");
            }
        }, false, null);
    }
}

Build event

import java.util.UUID;
import eventstore.core.EventData;
import eventstore.j.EventDataBuilder;

public class EventDataBuilderExample {

    final EventData empty = new EventDataBuilder("eventType").build();

    final EventData binary = new EventDataBuilder("binary")
            .eventId(UUID.randomUUID())
            .data(new byte[]{1, 2, 3, 4})
            .metadata(new byte[]{5, 6, 7, 8})
            .build();

    final EventData string = new EventDataBuilder("string")
            .eventId(UUID.randomUUID())
            .data("data")
            .metadata("metadata")
            .build();

    final EventData json = new EventDataBuilder("json")
            .eventId(UUID.randomUUID())
            .jsonData("{\"data\":\"data\"}")
            .jsonMetadata("{\"metadata\":\"metadata\"}")
            .build();
}

Scala examples

Read event

import java.net.InetSocketAddress
import _root_.akka.actor._
import _root_.akka.actor.Status.Failure
import eventstore.akka.tcp.ConnectionActor

object ReadEventExample extends App {
  val system = ActorSystem()

  val settings = Settings(
    address = new InetSocketAddress("127.0.0.1", 1113),
    defaultCredentials = Some(UserCredentials("admin", "changeit"))
  )

  val connection = system.actorOf(ConnectionActor.props(settings))
  implicit val readResult = system.actorOf(Props[ReadResult]())

  connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

  class ReadResult extends Actor with ActorLogging {
    def receive = {
      case ReadEventCompleted(event) =>
        log.info("event: {}", event)
        shutdown()

      case Failure(e: EsException) =>
        log.error(e.toString)
        shutdown()
    }

    def shutdown(): Unit = { context.system.terminate(); () }
  }
}

Write event

import _root_.akka.actor.Status.Failure
import _root_.akka.actor.{ ActorLogging, Actor, Props, ActorSystem }
import eventstore.core.util.uuid.randomUuid
import eventstore.akka.tcp.ConnectionActor

object WriteEventExample extends App {

  val system      = ActorSystem()
  val connection  = system.actorOf(ConnectionActor.props())
  val event       = EventData("my-event", eventId = randomUuid, data = Content("my event data"), metadata = Content("my first event"))

  implicit val writeResult = system.actorOf(Props(WriteResult))

  connection ! WriteEvents(EventStream.Id("my-stream"), List(event))

  case object WriteResult extends Actor with ActorLogging {

    def receive = {
  
View on GitHub
GitHub Stars222
CategoryDevelopment
Updated15d ago
Forks37

Languages

Scala

Security Score

95/100

Audited on Mar 23, 2026

No findings