- Introduction
- Supported types
- Supported operations
- Defining modules
- Deploying modules
- Publishing events
- Java Flight Recorder support
- Requirements
- Installation
- Running Integration Test
vertx-mongodb-effect allows us to work with MongoDB following a purely functional and reactive style. It requires to be familiar with vertx-effect. Both vertx-effect and vertx-mongo-effect use the immutable and persistent Json from json-values. Jsons travel across the event bus, from verticle to verticle, back and forth, without being neither copied nor converted to BSON. The vertx codecs to send the Json from json-values to the event bus are in vertx-values, which is a dependency of vertx-effect.
json-values supports the standard Json types: string, number, null, object, array; There are five number specializations: int, long, double, decimal, and BigInteger. json-values adds support for instants and binary data. It serializes Instants into its string representation according to ISO-8601, and the binary type into a string encoded in base 64.
vertx-mongodb-effect uses mongo-values. It abstracts the processes of encoding to BSON and decoding from BSON. Please find below the BSON types supported and their equivalent types in json-values.
Map<BsonType, Class<?>> map = new HashMap<>();
map.put(BsonType.NULL, JsNull.class);
map.put(BsonType.ARRAY, JsArray.class);
map.put(BsonType.BINARY, JsBinary.class);
map.put(BsonType.BOOLEAN, JsBool.class);
map.put(BsonType.DATE_TIME, JsInstant.class);
map.put(BsonType.DOCUMENT, JsObj.class);
map.put(BsonType.DOUBLE, JsDouble.class);
map.put(BsonType.INT32, JsInt.class);
map.put(BsonType.INT64, JsLong.class);
map.put(BsonType.DECIMAL128, JsBigDec.class);
map.put(BsonType.STRING, JsStr.class);
When defining the mongodb settings, you have to specify the codec registry JsValuesRegistry from mongo-values:
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import mongovalues.JsValuesRegistry;
MongoClientSettings  settings =
             MongoClientSettings.builder()
                                .applyConnectionString(connString)
                                .codecRegistry(JsValuesRegistry.INSTANCE)
                                .build();
Every method of the MongoDB driver has an associated lambda.
Since vertx-mongodb-effect uses the driver API directly, it can benefit from all its features and methods. It's an advantage over the official vertx-mongodb-client.
Please find below the types and constructors of the most essentials operations:
Count :: Lambdac<JsObj, Long>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.CountOptions;
public Count(Supplier<MongoCollection<JsObj>> collectionSupplier,
             CountOptions options
            )
DeleteMany :: Lambdac<JsObj, O>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.result.DeleteResult;
public DeleteMany(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<DeleteResult, O> resultConverter,
                  DeleteOptions options 
                 )
                      
DeleteOne :: Lambdac<JsObj, O>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.result.DeleteResult;
public DeleteOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                 Function<DeleteResult, O> resultConverter,
                 DeleteOptions options 
                )
                      
FindAll :: Lambdac<FindMessage, JsArray>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.FindIterable;
public FindAll(Supplier<MongoCollection<JsObj>> collectionSupplier,
               Function<FindIterable<JsObj>, JsArray> converter 
              )
FindOne :: Lambdac<FindMessage, JsObj>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.FindIterable;
public FindOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
               Function<FindIterable<JsObj>, JsObj> converter 
              )                 
FindOneAndDelete :: Lambdac<JsObj, JsObj>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndDeleteOptions;
public FindOneAndDelete(Supplier<MongoCollection<JsObj>> collectionSupplier,
                        FindOneAndDeleteOptions options
                       ) 
FindOneAndReplace :: Lambdac<UpdateMessage, JsObj>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndReplaceOptions;
public FindOneAndReplace(Supplier<MongoCollection<JsObj>> collectionSupplier,
                         FindOneAndReplaceOptions options
                        )   
FindOneAndUpdate :: Lambdac<UpdateMessage, JsObj>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndUpdateOptions;
public FindOneAndUpdate(Supplier<MongoCollection<JsObj>> collectionSupplier,
                        FindOneAndUpdateOptions options
                       )
InsertMany :: Lambdac<JsArray, R>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.client.model.InsertManyOptions;
public InsertMany(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<InsertManyResult, R> resultConverter,
                  InsertManyOptions options
                 )   
InsertOne :: Lambdac<JsObj, R>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.InsertOneOptions;
public InsertOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                 Function<InsertOneResult, R> resultConverter,
                 InsertOneOptions options
                )    
ReplaceOne :: Lambdac<UpdateMessage, O>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.model.ReplaceOptions;
public ReplaceOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<UpdateResult, O> resultConverter,
                  ReplaceOptions options
                 )   
UpdateMany :: Lambdac<UpdateMessage, O>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.model.UpdateOptions
public UpdateMany(Supplier<MongoCollection<JsObj>> collectionSupplier,
                  Function<UpdateResult, O> resultConverter,
                  UpdateOptions options
                 )
UpdateOne :: Lambdac<UpdateMessage, O>
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.model.UpdateOptions
public UpdateOne(Supplier<MongoCollection<JsObj>> collectionSupplier,
                 Function<UpdateResult, O> resultConverter,
                 UpdateOptions options
                )
Like with vertx-effect, modules deploys verticles and expose lambdas to communicate with them. The typical scenario is to create a module per collection. We can deploy or spawn verticles.
The following modules are just a couple of examples.
We create a module where all the lambdas make read operations and spawn verticles to reach a significant level of parallelization:
import vertx.mongodb.effect.MongoModule;
import vertx.effect.Lambdac;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.ThreadingModel;
public class ReadModule extends MongoModule {
    public MyCollectionModule(final Supplier<MongoCollection<JsObj>> collection) {
        super(collection,
              new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
             );
    }
    public static Lambdac<FindMessage, Optional<JsObj>> findOne;
    public static Lambdac<FindMessage, JsArray> findAll;
    public static Lambdac<JsObj, Long> count;
    public static Lambdac<JsArray, JsArray> aggregate;
    @Override
    protected void deploy() {}  
    @Override
    protected void initialize() {
        Lambdac<FindMessage, JsObj> findOneLambda = vertxRef.spawn("find_one",
                                                                   new FindOne(collection)
                                                                  );
        this.findOne = (context,message) -> findOneLambda.apply(context,message)
                                                         .map(Optional::ofNullable);
        this.findAll = vertxRef.spawn("find_all",
                                      new FindAll(collection)
                                     );
        this.count = vertxRef.spawn("count",
                                    new Count(collection)
                                   );
        this.aggregate = vertxRef.spawn("aggregate",
                                        new Aggregate<>(collection,
                                                        Converters.aggregateResult2JsArray
                                                       )
                                       );
    }
}
We create a module where all the lambdas make delete, insert and update operations, and deploy only one instance per verticle.
import vertx.mongodb.effect.MongoModule;
import vertx.effect.Lambdac;
public class MyCollectionModule extends MongoModule {
    public MyCollectionModule(final Supplier<MongoCollection<JsObj>> collection) {
        super(collection);
    }
    public static Lambdac<JsObj, String> insertOne;
    public static Lambdac<JsObj, JsObj> deleteOne;
    public static Lambdac<UpdateMessage, JsObj> replaceOne;
    public static Lambdac<UpdateMessage, JsObj> updateOne;
    @Override
    protected void deploy() {
        this.deploy(INSERT_ONE_ADDRESS,
                    new InsertOne<>(collection,
                                    Converters.insertOneResult2HexId
                                   ),
                   );
        this.deploy(DELETE_ONE_ADDRESS,
                    new DeleteOne<>(collection,
                                    Converters.deleteResult2JsObj
                                   )
                   );
        this.deploy(REPLACE_ONE_ADDRESS,
                    new ReplaceOne<>(collection,
                                     Converters.updateResult2JsObj
                                    )
                   );
        this.deploy(UPDATE_ONE_ADDRESS,
                    new UpdateOne<>(collection,
                                    Converters.updateResult2JsObj
                                   )
                   );
    }  
    @Override
    protected void initialize() {
        this.insertOne = this.trace(INSERT_ONE_ADDRESS);
        this.deleteOne = this.trace(DELETE_ONE_ADDRESS);
        this.replaceOne = this.trace(REPLACE_ONE_ADDRESS);
        this.updateOne = this.trace(UPDATE_ONE_ADDRESS);
    }
    private static final String DELETE_ONE_ADDRESS = "delete_one";   
    private static final String UPDATE_ONE_ADDRESS = "update_one";
    private static final String REPLACE_ONE_ADDRESS = "replace_one";
    private static final String INSERT_ONE_ADDRESS = "insert_one";
    private static final String INSERT_MANY_ADDRESS = "insert_all";
    private static final String DELETE_MANY_ADDRESS = "delete_all";
}
The verticles RegisterMongoEffectCodecs and RegisterJsValuesCodecs need to be deployed to register the vertx message codecs. Remember that you can't send any message to the event bus. If a message is not supported by Vertx you have to create a MessageCodec.
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import vertx.mongodb.effect.MongoVertxClient;
import vertx.effect.VertxRef;
// define every timeout if you wanna be reactive
int connectTimeoutMS =  ???;
int socketTimeoutMS = ???;
int serverSelectionTimeoutMS = ???;
String connectionUrl = 
     String.format("mongodb://localhost:27017/?connectTimeoutMS=%s&socketTimeoutMS=%s&serverSelectionTimeoutMS=%s",
                   connectTimeoutMS,
                   socketTimeoutMS,
                   serverSelectionTimeoutMS 
                  );
 
ConnectionString connString = new ConnectionString(connectionUrl);
MongoClientSettings  settings =
             MongoClientSettings.builder()
                                .applyConnectionString(connString)
                                .codecRegistry(JsValuesRegistry.INSTANCE)
                                .build();
// one vertx client per database connection 
MongoVertxClient mongoClient = new MongoVertxClient(settings);
String database = ???;
String collection = ???; 
MyCollectionModule collectionModule = 
          new MyCollectionModule(mongoClient.getCollection(database,
                                                           collection
                                                          )
                                );
VertxRef vertxRef = new VertxRef(vertx);
Quadruple.sequential(vertxRef.deployVerticle(new RegisterJsValuesCodecs()),
                     vertxRef.deployVerticle(new RegisterMongoEffectCodecs()),
                     vertxRef.deployVerticle(mongoClient),
                     vertxRef.deployVerticle(collectionModule)
                     ) 
         .get();
Once everything is up and running, enjoy your lambdas!
BiFunction<Integer,String,Val<Optional<JsObj>>> findByCode = (attempts,code) ->
          MyCollectionModule.findOne
                            .apply(FindMessage.ofFilter(JsObj.of("code",
                                                                 JsStr.of(code)
                                                                )
                                                        ) 
                                   )                    
                            .retry(e -> Failures.anyOf(MONGO_CONNECT_TIMEOUT_CODE,
                                                       MONGO_READ_TIMEOUT_CODE
                                                      ),
                                   attempts
                                  )
                            .recoverWith(e -> Val.succeed(Optional.empty()));
Since vertx-effect publishes the most critical events into the address vertx-effect-events, it' possible to register consumers to explode that information. You can disable this feature with the Java system property -Dpublish.events=false. Thanks to Lambdac, it's possible to correlate different events that belongs to the same transaction. Go to the vertx-effect doc for further details.
Since vertx-effect supports JFR, all the verticle messages have an associated event and can be visualized using Java Mission Control.
Fields of a verticle message event:
- address: Address of the Verticle where the message is sent to
- result: SUCCESS OR FAILURE, dependening on what the caller receives
- failure code: In case the failure is a ReplyException, it's the failure code
- failure type: In case the failure is a ReplyException, it's the failure type
- failure message: In case the failure is a ReplyException, it's the failure message
- exception class: In case the failure is not a ReplyException, it's the exception class name
- exception message: In case the failure is not a ReplyException, it's the exception message
- duration: time since the message is sent until the response is received
- Java 17 or greater
- vertx-effect
- mongo driver sync
- Mongo values
For Java 17 or higher:
<dependency>
   <groupId>com.github.imrafaelmerino</groupId>
   <artifactId>vertx-mongodb-effect</artifactId>
   <version>2.0.0</version>
</dependency>
For Java 21 or higher:
<dependency>
  <groupId>com.github.imrafaelmerino</groupId>
  <artifactId>vertx-mongodb-effect</artifactId>
  <version>3.0.0</version>
</dependency>
Before executing the integration tests, ensure that a Mongo replica set is up and running.
#Edit this path to your particular case
PROJECT_HOME="/Users/rmerino/Projects/vertx-mongodb-effect"
CONFIGURATION_FILE_PATH="${PROJECT_HOME}/src/test/resources/conf.yml"
if [ ! -f "$CONFIGURATION_FILE_PATH" ]; then
    echo "The configuration file ${CONFIGURATION_FILE_PATH} does not exist."
fi
docker run -d -p 27017:27017 \
-v ${CONFIGURATION_FILE_PATH}:/etc/conf.yml \
--name mongo1 mongo --config "/etc/conf.yml"
docker run -d -p 27018:27017 \
-v ${CONFIGURATION_FILE_PATH}:/etc/conf.yml \
--name mongo2 mongo --config "/etc/conf.yml"
docker run -d -p 27019:27017 \
-v ${CONFIGURATION_FILE_PATH}:/etc/conf.yml \
--name mongo3 mongo --config "/etc/conf.yml" 
#let's configure the replica opening the Mongo console
docker exec -it mongo1 mongoshand execute the following command, where 192.168.1.64 is my IP (don't use localhost):
rs.initiate(
       {
         _id: "rs0",
         members: [
           { _id: 0, host: "192.168.1.64:27017" },
           { _id: 1, host: "192.168.1.64:27018" },
           { _id: 2, host: "192.168.1.64:27019" }
         ]
       }
     )Once the Mongo replica set is up and running, you're ready to run the integration tests.
To run the tests, use one of the following Maven commands:
- mvn failsafe:integration-test— To run integration tests only.
- mvn verify— To run both unit and integration tests.
