Blog

Spring Data R2DBC and Kotlin Coroutines

27 Feb, 2023
Xebia Background Header Wave

Combining both Spring Data R2DBC and Kotlin Coroutines, we obtain a seamless and efficient way to perform non-blocking database operations. In this blog post, we explore the different options for handling non-blocking database accesses with these two powerful tools inside a reactive stack to write an end-to-end feature using Spring WebFlux, Kotlin Coroutines, and Spring Data R2DBC.

On the one hand, Spring Data R2DBC is a tool that allows developers to access databases using a reactive programming model. This can help improve an application’s performance, especially when dealing with IO-bound tasks such as database accesses and high concurrency.

On the other hand, Kotlin Coroutines is a tool for writing asynchronous and non-blocking code. They allow you to write code that is easy to read and understand while still being efficient and non-blocking.

Reactive vs. Coroutines

Reactive programming is a programming paradigm that handles asynchronous data streams and events by using data flows and propagation of change. In the context of the Spring framework, Spring WebFlux is a reactive web module that allows developers to build reactive web applications inside the Spring ecosystem.

Reactor is the fully non-blocking library of choice for Spring Reactive Web (WebFlux). It is based on the Reactive Streams specification, and it provides a Reactive API with types as Mono or Flux to encapsulate a stream of values that is asynchronously computed.

Nevertheless, Kotlin Coroutines provides a simple and intuitive programming model for writing non-blocking code, but in an imperative and sequential style. Moreover, Coroutines work perfectly with non-blocking frameworks such as Spring WebFlux, converting the Reactive API to suspend functions and providing types as Flow for data streaming.

Spring Data R2DBC

Now, let’s understand what R2DBC stands for. R2DBC is an acronym for Reactive Relational Database Connectivity. It is a specification that defines how a reactive programming model can be applied to database access. This allows developers to work with databases using a non-blocking programming model.

Spring Data R2DBC is an implementation of the R2DBC specification that is built on top of Spring Data. It provides a simple and consistent API for working with databases using the Spring ecosystem. This means that developers can use the same familiar Spring Data annotations and interfaces to work with databases, but with the added benefits of a reactive stack.

Getting started

To get started with a reactive Spring application, you must first add the appropriate dependencies to your project. If you are starting a new project from scratch, a good approach could be to create the project from Spring Initializr and choosing Spring Reactive Web (WebFlux), Spring Data R2DBC, and some driver, for example, the PostgreSQL Driver.

But this time, we will add the needed dependencies manually, also adding the required dependencies for Kotlin Coroutines and extensions for Reactor.

dependencies {  
   implementation("org.springframework.boot:spring-boot-starter-webflux")
   implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
   implementation("org.postgresql:r2dbc-postgresql")
   implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
}

Here, we are importing Spring WebFlux, Spring Data R2DBC, R2DBC PostgreSQL Driver, and Kotlin Coroutines libraries. Also, by adding the last dependencies for Kotlin Coroutines and Reactor, we can translate from Reactor API to Coroutines imperative code.

After that, you might need a database instance. In this example, we use Docker Compose to create a Docker container with a PostgreSQL database. For that, we create a docker-compose.yml file inside the project:

services:  
  postgres:  
    image: postgres:15.0-alpine  
    hostname: postgres  
    ports:  
      - 5432:5432
    restart: always  
    environment:  
      POSTGRES_DB: testDb  
      POSTGRES_USER: someUser  
      POSTGRES_PASSWORD: somePassword

Then, we simply run the command docker-compose up -d in the terminal to run it in the background.

Database configuration

Once we have the dependencies in place and the database running, it’s time to create the database configuration. We could choose to use Spring YAML Configuration, but, for this example, we will do it programmatically and using the default Postgres port (5432):

@Configuration  
@EnableR2dbcRepositories  
class DatabaseConfig : AbstractR2dbcConfiguration() {  
    @Bean
    override fun connectionFactory(): ConnectionFactory =  
        PostgresqlConnectionFactory(  
            PostgresqlConnectionConfiguration.builder()  
                .host("localhost")  
                .database("testDb")  
                .username("someUser")  
                .password("somePassword")  
                .build()  
        )  
}

First, we use Spring’s @Configuration annotation that allows us to use annotations for dependency injection, while @EnableR2dbcRepositories activates reactive relational repositories using R2DBC. The configuration class is extending AbstractR2dbcConfiguration because we are overriding the default ConnectionFactory with our database configuration. Finally, we mark it as a @Bean to be injected later.

Inside the same DatabaseConfig class, we will also create a ConnectionFactoryInitializer for initializing the connection, injecting via constructor the previously defined ConnectionFactory, and populating the database on the application initialization:

@Bean  
fun databaseInitializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer =  
    ConnectionFactoryInitializer().apply {  
        setConnectionFactory(connectionFactory)  
        setDatabasePopulator(  
            CompositeDatabasePopulator().apply {  
                addPopulators(  
                    ResourceDatabasePopulator(  
                        ClassPathResource("sql/schema.sql"),  
                        ClassPathResource("sql/data.sql")  
                    )  
                )  
            }  
        )  
    }

Inside a resources/sql folder, we define a schema.sql file for creating the table:

CREATE TABLE IF NOT EXISTS users (  
     id BIGSERIAL PRIMARY KEY,  
     username VARCHAR NOT NULL,  
     email VARCHAR NOT NULL,  
     CONSTRAINT unique_user UNIQUE (username, email)  
);

We define a data.sql with some default test data:

INSERT INTO users (username, email) VALUES ('test', 'user@test.com') ON CONFLICT DO NOTHING;

We also define a User data transfer object (DTO), adding the annotations for mapping our code to the database:

@Table("users")
data class User(
    @Id val id: Long,
    @Column("username") val username: String,
    @Column("email") val email: String,
)

Repository

Now let’s explore our different options for operating with the database in a non-blocking way using Kotlin Coroutines in the Repository Layer.

DatabaseClient

DatabaseClient is a non-blocking, reactive client for performing database calls with reactive streams back pressure, and Spring Data R2DBC provides Coroutines extensions for it.

For example:

@Repository  
class UserRepository(private val client: DatabaseClient) {  
    suspend fun findById(id: Long): User? =  
        client  
            .sql("SELECT * FROM users WHERE id = $id")  
            .map { row ->  
                User(  
                    row.get("id") as Long,  
                    row.get("username") as String,  
                    row.get("email") as String,  
                )  
            }  
            .awaitOneOrNull()  
}

In this example, first, we define a UserRepository class to perform database operations using DatabaseClient. The findById function takes an id parameter and returns a nullable User object. Inside the method, we use the databaseClient.sql to execute a SQL query that selects the user from the database by its id. After that, we use the map method to map the result set to the User object. Finally, we use awaitOneOrNull() to wait for the result of the operation, returning null if there is no result.

If you are familiar with the Spring reactive stack, you might notice there is no need to wrap the value inside a Mono, but instead, findByUsername is marked as suspend. The suspend keyword in Kotlin is used to indicate that it is a suspend function, and it should be called only from a coroutine or another suspend function (inside a coroutine context).

R2dbcEntityTemplate

It is also possible to use the R2dbcEntityTemplate to perform operations on entities. And, again, Spring Data R2DBC provides Coroutines extensions for it.

For example:

@Repository
class UserRepository(private val template: R2dbcEntityTemplate) {
    suspend fun findById(id: Long): User? =
        template.select(User::class.java)
            .matching(
                Query.query(
                    Criteria.where("id").`is`(id)
                )
            )
            .awaitOneOrNull()
}

This time, we define a UserRepository class, but using R2dbcEntityTemplate for the database operations. The suspend findById function takes an id parameter and returns a nullable User object. Inside the method, we use the r2dbcEntityTemplate.select(User::class.java) to select the user from the database by its id. The matching method matches the value of the id parameter to the query. Finally, we use awaitOneOrNull() to wait for the result of the operation; it will return null if there is no result.

The main difference with the previous approach is that we are not writing SQL code this time, and we have the advantage of matching directly against the User DTO instead of mapping every single field one by one.

CoroutineCrudRepository

Spring Data R2DBC project includes Coroutine Repository, which exposes the non-blocking nature of data access through Kotlin’s Coroutines. To use it, define a repository extending CoroutineCrudRepository.

For example:

interface UserRepository : CoroutineCrudRepository<User, Long> { 
    override suspend fun findById(id: Long): User? 
}

We define a UserRepository interface that extends CoroutineCrudRepository. The latter interface provides several useful methods for performing CRUD operations on User objects, such as save, findById, and delete.

For this example, we wouldn’t really need to define the findById function, as it is already part of the CoroutineCrudRepository interface. However, we can override it or define new custom methods in case we need to do different operations with the database.

Consider the keyword suspend; without it, or without returning a type that enables context propagation (like Flow), the coroutine context would not be available.

The advantages of this approach are the same as using any Spring Data repository abstraction, therefore reducing the amount of boilerplate code and allowing us to run queries against the database using just keywords, but in a non-blocking manner with Coroutines.

Service

In the Service Layer, we have to define a class annotated with @Service, inject the repository, and call it from a suspending function:

@Service  
class UserService(val repository: CoroutineUserRepository) {  
   suspend fun findUserById(id: Long): User? = 
       repository.findById(id)
}

As we defined each example with the same function signature, we could choose any of them to be called from this service.

Controller

In the Controller Layer, we must define a class annotated with a @Controller annotation. In our case, we use the specialized version @RestController. In the constructor, we inject the service and invoke the previously defined method from a suspending function when the /users/{id} endpoint is called:

@RestController
class UserController(private val userService: UserService) {
    @GetMapping("/users/{id}")
    suspend fun getUserById(@PathVariable id: String): ResponseEntity<User> {
        val user = userService.findUserById(id)
        return if (user != null) ResponseEntity.ok(user)  
        else ResponseEntity.notFound().build()
    }
}

Annotating the getUserById function with the suspend keyword inside the @RestController will tell WebFlux to use Coroutines instead of the Reactive API when calling the endpoint.

Running and testing the example

When using Spring WebFlux, Spring automatically configures Netty as the default server, a non-blocking I/O client-server framework.

If we run the example application, we should get in the console something like this, which tells us that the application is up and running:

2023-01-25T15:26:57.692+01:00  INFO 69644 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 65 ms. Found 1 R2DBC repository interfaces.
2023-01-25T15:26:58.399+01:00  INFO 69644 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2023-01-25T15:26:58.403+01:00  INFO 69644 --- [           main] c.e.r2dbc.ExampleR2DBCApplicationKt      : Started ExampleR2DBCApplicationKt in 1.642 seconds (process running for 2.034)

Now, if we call the endpoint, we should get something like this:

GET http://localhost:8080/users/1

HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 58

{
  "id": 1,
  "username": "testuser",
  "email": "testuser@test.com"
}

Conclusions

Both Spring Data R2DBC and Kotlin Coroutines are powerful tools for building asynchronous, non-blocking applications. While Spring Data R2DBC provides a convenient way to operate with relational databases using reactive programming, it also requires a certain mindset and approach, such as sticking to reactive chains and subscribing to values, which can take some time to get used to.

Kotlin Coroutines, on the other hand, provide a more familiar and intuitive approach to writing asynchronous code, allowing developers to use typical programming patterns and write code that looks like it is being executed sequentially.

We have only explored some basics of those concepts and shown the different ways that we can interact with the database using Spring Data R2DBC and Coroutines, demonstrating the relatively easy way to integrate it in a Spring project because libraries such as kotlinx-coroutines-reactive and kotlinx-coroutines-reactor allow us to convert from the Reactive API to the Coroutines API.

Also, in the future, we could examine other ways to take advantage of Coroutines in the rest of the Spring ecosystem or even other frameworks.

If you want to expand your knowledge related to this, I recommend reviewing the documentation of all the tools mentioned in the post for further details.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts