spring-data-cassandra-reactive
Spring Data with Reactive Cassandra
1. Introduction
Particularly, this is the third article of the Spring Data Cassandra article series. In this one, we’ll expose a Cassandra database using a REST API.
2. Maven Dependencies
As a matter of fact, Spring Data Cassandra supports Project Reactor and RxJava reactive types. To demonstrate, we’ll use the Project reactor’s reactive types Flux and Mono in this tutorial.
To start with, let’s add the dependencies needed for our tutorial:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
The latest version of the spring-data-cassandra can be found here.
Now, we’re going to expose SELECT operations from the database via a REST API. So, let’s add the dependency for RestController, too:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3. Implementing our App
@Table
public class Employee {
@PrimaryKey
private int id;
private String name;
private String address;
private String email;
private int age;
}
Next, its time to create an EmployeeRepository that extends from ReactiveCassandraRepository. It’s important to note that this interface enables the support for reactive types:
public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {
@AllowFiltering
Flux<Employee> findByAgeGreaterThan(int age);
}
3.1. Rest Controller for CRUD Operations
For the purpose of illustration, we’ll expose some basic SELECT operations using a simple Rest Controller:
@RestController
@RequestMapping("employee")
public class EmployeeController {
@Autowired
EmployeeService employeeService;
@PostConstruct
public void saveEmployees() {
List<Employee> employees = new ArrayList<>();
employees.add(new Employee(123, "John Doe", "Delaware", "[email protected]", 31));
employees.add(new Employee(324, "Adam Smith", "North Carolina", "[email protected]", 43));
employees.add(new Employee(355, "Kevin Dunner", "Virginia", "[email protected]", 24));
employees.add(new Employee(643, "Mike Lauren", "New York", "[email protected]", 41));
employeeService.initializeEmployees(employees);
}
@GetMapping("/list")
public Flux<Employee> getAllEmployees() {
Flux<Employee> employees = employeeService.getAllEmployees();
return employees;
}
@GetMapping("/{id}")
public Mono<Employee> getEmployeeById(@PathVariable int id) {
return employeeService.getEmployeeById(id);
}
@GetMapping("/filterByAge/{age}")
public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {
return employeeService.getEmployeesFilterByAge(age);
}
}
Finally, let’s add a simple EmployeeService:
@Service
public class EmployeeService {
@Autowired
EmployeeRepository employeeRepository;
public void initializeEmployees(List<Employee> employees) {
Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);
savedEmployees.subscribe();
}
public Flux<Employee> getAllEmployees() {
Flux<Employee> employees = employeeRepository.findAll();
return employees;
}
public Flux<Employee> getEmployeesFilterByAge(int age) {
return employeeRepository.findByAgeGreaterThan(age);
}
public Mono<Employee> getEmployeeById(int id) {
return employeeRepository.findById(id);
}
}
4. Testing the Endpoints
4.1. Manual Testing
curl localhost:8080/employee/list
As a result, we get all the employees:
[
{
"id": 324,
"name": "Adam Smith",
"address": "North Carolina",
"email": "[email protected]",
"age": 43
},
{
"id": 123,
"name": "John Doe",
"address": "Delaware",
"email": "[email protected]",
"age": 31
},
{
"id": 355,
"name": "Kevin Dunner",
"address": "Virginia",
"email": "[email protected]",
"age": 24
},
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "[email protected]",
"age": 41
}
]
Moving on, let’s try to find a specific employee by his id:
curl localhost:8080/employee/643
As a result, we get Mr. Mike Lauren back:
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "[email protected]",
"age": 41
}
Finally, let’s see if our age filter works:
curl localhost:8080/employee/filterByAge/35
And as expected, we get all the employees whose age is greater than 35:
[
{
"id": 324,
"name": "Adam Smith",
"address": "North Carolina",
"email": "[email protected]",
"age": 43
},
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "[email protected]",
"age": 41
}
]
4.2. Integration Testing
@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveEmployeeRepositoryIntegrationTest {
@Autowired
EmployeeRepository repository;
@Before
public void setUp() {
Flux<Employee> deleteAndInsert = repository.deleteAll()
.thenMany(repository.saveAll(Flux.just(
new Employee(111, "John Doe", "Delaware", "[email protected]", 31),
new Employee(222, "Adam Smith", "North Carolina", "[email protected]", 43),
new Employee(333, "Kevin Dunner", "Virginia", "[email protected]", 24),
new Employee(444, "Mike Lauren", "New York", "[email protected]", 41))));
StepVerifier
.create(deleteAndInsert)
.expectNextCount(4)
.verifyComplete();
}
@Test
public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
Mono<Long> saveAndCount = repository.count()
.doOnNext(System.out::println)
.thenMany(repository
.saveAll(Flux.just(
new Employee(325, "Kim Jones", "Florida", "[email protected]", 42),
new Employee(654, "Tom Moody", "New Hampshire", "[email protected]", 44))))
.last()
.flatMap(v -> repository.count())
.doOnNext(System.out::println);
StepVerifier
.create(saveAndCount)
.expectNext(6L)
.verifyComplete();
}
@Test
public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {
StepVerifier
.create(repository.findByAgeGreaterThan(35))
.expectNextCount(2)
.verifyComplete();
}
}
5. Conclusion
In summary, we learned how to use reactive types using Spring Data Cassandra to build a non-blocking application.
As always, check out the source code for this tutorial over on GitHub.