Managing data consistency in a microservice architecture using Sagas - Implementing an orchestration-based saga
sagas transaction managementThis is the fourth in a series of posts that expands on my MicroCPH talk on Managing data consistency in a microservice architecture using Sagas (slides, video). The previous posts were:
In this post, I describe how to implement orchestration-based sagas.
The orchestration-based Create Order Saga
As I described in part 2, an orchestration-based saga has an orchestrator that tells the saga’s participants what to do. The saga orchestrator communicates with the participants using request/asynchronous response-style interaction. To execute a saga step, it sends a command message to a participant telling it what operation to perform. After the saga participant has performed the operation, it sends a reply message to the orchestrator. The orchestrator then processes the reply message and determines which saga step to perform next.
For example, the following diagram shows the flow for the orchestration-based Create Order Saga
.
The flow is as follows:
- The
Order Service
handles thePOST /orders
request by creating theCreate Order Saga
orchestrator - The saga orchestrator creates an
Order
in thePENDING
state - It then sends a
Reserve Credit
command to theCustomer Service
- The
Customer Service
attempts to reserve credit - It then sends back a reply message indicating the outcome
- The saga orchestrator either approves or rejects the
Order
Implementing an orchestration-based saga
Let’s take a look at how the Order Service
and Customer Service
implement the orchestration version of the Create Order Saga
.
The complete example code is in the eventuate-tram-sagas-examples-customers-and-orders Github repository.
Design overview
The Order Service
defines CreateOrderSaga
class, which orchestrates the saga.
The Customer Service
implements a command handler, which process the command message sent by the Create Order Saga
.
The following diagram shows the design:
About the Eventuate Tram Saga framework
This example code is developed using using Eventuate Tram Sagas, which is a framework for implementing orchestration-based sagas. This framework has three main features:
- Provides a DSL for defining saga orchestrators
- Provides an API for defining command message handlers in saga participants
- Manages the execution of sagas including persisting their state in the database and communicating with saga participants
Let’s first look at how this framework is used by the Order Service
.
The Order Service
The Order Service
consists of the following key classes:
OrderController
- handles thePOST /order
requestOrderService
- creates theCreate Order Saga
SagaInstanceFactory
- provided by the Eventuate Tram Saga framework and used by theOrderService
to instantiate aCreate Order Saga
CreateOrderSaga
- a singleton@Bean
that defines the flow of theCreate Order Saga
CreateOrderSagaData
- theCreate Order Saga
’s persistent data
As with the choreography-based version, the Create Order Saga
is initiated in the Order Service
when the client makes a POST /orders
request.
The OrderController
@Controller
invokes OrderService.createOrder()
.
But rather than creating an Order
and publishing an event, this version of that method creates an instance of the saga orchestrator:
public class OrderService {
@Autowired
private SagaInstanceFactory sagaInstanceFactory;
@Autowired
private CreateOrderSaga createOrderSaga;
@Transactional
public Order createOrder(OrderDetails orderDetails) {
CreateOrderSagaData data = new CreateOrderSagaData(orderDetails);
sagaInstanceFactory.create(createOrderSaga, data);
return orderRepository.findOne(data.getOrderId());
}
The createOrder()
method first creates an CreateOrderSagaData
initialized with the order’s details.
Next, it creates and starts an instance of the Create Order Saga
by calling SagaInstanceFactory.create()
, which is provided by the Eventuate Tram Saga framework.
Finally, createOrder()
method retrieves the Order
created by the first step of the saga.
The CreateOrderSaga
class
The CreateOrderSaga
class defines the flow of the Create Order Saga
.
The following listing shows the key part of this class:
public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {
private SagaDefinition<CreateOrderSagaData> sagaDefinition =
step()
.invokeLocal(this::create)
.withCompensation(this::reject)
.step()
.invokeParticipant(this::reserveCredit)
...
.step()
.invokeLocal(this::approve)
.build();
...
This saga defines three steps, which mirror the definition of the Create Order Saga
in part 1.
Each step consists of a transaction and possibly a compensating transaction.
The Eventuate Tram saga orchestration framework executes the transactions in top to bottom order.
If a transaction fails, it then executes the compensating transactions in bottom to top order.
Before looking at each of the steps in detail, let’s first examine the CreateOrderSagaData
class.
The CreateOrderSagaData
class defines the persistent state of the Create Order Saga
:
public class CreateOrderSagaData {
private OrderDetails orderDetails;
private Long orderId;
private RejectionReason rejectionReason;
public CreateOrderSagaData(OrderDetails orderDetails) {
this.orderDetails = orderDetails;
}
...
}
The OrderService
creates a CreateOrderSagaData
containing the order’s details.
The orderId
and rejectionReason
fields are updated during the execution of the saga.
There is one instance of this class for each instance of the Create Order Saga
.
The Eventuate Tram Saga framework persists each CreateOrderSagaData
instance in the database.
As noted above, the CreateOrderSaga
class defines a saga consisting of three steps.
The first and third steps of the saga perform local updates in the Order Service
.
The first step consists of a transaction that creates an Order
and a compensating transaction that rejects the Order
:
public class CreateOrderSaga
private SagaDefinition<CreateOrderSagaData> sagaDefinition =
step()
.invokeLocal(this::create)
.withCompensation(this::reject)
...;
private void create(CreateOrderSagaData data) {
Order order = Order.createOrder(data.getOrderDetails());
orderRepository.save(order);
data.setOrderId(order.getId());
}
public void reject(CreateOrderSagaData data) {
orderRepository.findById(data.getOrderId()).get().reject(data.getRejectionReason());
}
The create()
method creates an Order
, saves it in the database and records its ID in the CreateOrderSagaData
.
The reject()
method rejects the order and records the reason for its rejection.
The third step of the saga approves the Order
:
public class CreateOrderSaga
private SagaDefinition<CreateOrderSagaData> sagaDefinition =
...
.step()
.invokeLocal(this::approve)
.build();
private void approve(CreateOrderSagaData data) {
orderRepository.findById(data.getOrderId()).get().approve();
}
...
The approve()
method loads the Order
and calls its approve()
method.
The second step of the Create Order Saga
consists of a transaction that reserves credit.
It sends a ReserveCredit
command to the Customer Service
:
public class CreateOrderSaga
private SagaDefinition<CreateOrderSagaData> sagaDefinition =
...
.step()
.invokeParticipant(this::reserveCredit)
.onReply(CustomerNotFound.class, this::handleCustomerNotFound)
.onReply(CustomerCreditLimitExceeded.class, this::handleCustomerCreditLimitExceeded)
... ;
private CommandWithDestination reserveCredit(CreateOrderSagaData data) {
long orderId = data.getOrderId();
Long customerId = data.getOrderDetails().getCustomerId();
Money orderTotal = data.getOrderDetails().getOrderTotal();
return send(new ReserveCreditCommand(customerId, orderId, orderTotal))
.to("customerService")
.build();
}
private void handleCustomerNotFound(CreateOrderSagaData data,
CustomerNotFound reply) {
data.setRejectionReason(RejectionReason.UNKNOWN_CUSTOMER);
}
private void handleCustomerCreditLimitExceeded(
CreateOrderSagaData data, CustomerCreditLimitExceeded reply) {
data.setRejectionReason(RejectionReason.INSUFFICIENT_CREDIT);
}
The reserveCredit()
method returns a CommandWithDestination
, which tells the Eventuate Tram Saga framework which command message so send and to where.
This step of the saga defines two reply handlers, which are invoked by the framework when it receives a reply to the command message.
These methods record why the credit reservation attempt failed.
Here is the @Configuration
class that configures the Create Order Saga
:
@Configuration
...
@Import(SagaOrchestratorConfiguration.class)
public class OrderConfiguration {
@Bean
public OrderService orderService(OrderRepository orderRepository,
SagaInstanceFactory sagaInstanceFactory, CreateOrderSaga createOrderSaga) {
return new OrderService(orderRepository, sagaInstanceFactory, createOrderSaga);
}
@Bean
public CreateOrderSaga createOrderSaga() {
return new CreateOrderSaga();
}
}
This @Configuration
class @Import
s the Eventuate Tram Saga @Configuration
class that configures the @Bean
s for a saga orchestration.
It also defines the orderService
and the createOrderSaga
@Bean
s.
Let’s now look at how the Customer Service
participates in the Create Order Saga
The Customer Service
The Customer Service
participates in the Create Order Saga
by processing Reserve Credit
commands.
The CustomerCommandHandler
class defines the handler for this command:
public class CustomerCommandHandler {
private CustomerService customerService;
public CustomerCommandHandler(CustomerService customerService) {
this.customerService = customerService;
}
public CommandHandlers commandHandlerDefinitions() {
return SagaCommandHandlersBuilder
.fromChannel("customerService")
.onMessage(ReserveCreditCommand.class, this::reserveCredit)
.build();
}
public Message reserveCredit(CommandMessage<ReserveCreditCommand> cm) {
ReserveCreditCommand cmd = cm.getCommand();
try {
customerService.reserveCredit(cmd.getCustomerId(), cmd.getOrderId(), cmd.getOrderTotal());
return withSuccess(new CustomerCreditReserved());
} catch (CustomerNotFoundException e) {
return withFailure(new CustomerNotFound());
} catch (CustomerCreditLimitExceededException e) {
return withFailure(new CustomerCreditLimitExceeded());
}
}
The reserveCredit()
method invokes the CustomerService
to reserve credit.
It returns one of three possible reply messages describing the outcome:
CustomerCreditReserved
- if credit was reservedCustomerCreditLimitExceeded
- if insufficient credit was availableCustomerNotFound
- if the customer was not found
Here is the @Configuration
class that configures the command handler.
@Configuration
...
@Import(SagaParticipantConfiguration.class)
public class CustomerConfiguration {
@Bean
public CustomerCommandHandler customerCommandHandler(CustomerService customerService) {
return new CustomerCommandHandler(customerService);
}
@Bean
public CommandDispatcher consumerCommandDispatcher(CustomerCommandHandler target,
SagaCommandDispatcherFactory sagaCommandDispatcherFactory) {
return sagaCommandDispatcherFactory
.make("customerCommandDispatcher", target.commandHandlerDefinitions());
}
}
The customerCommandHandler
@Bean
instantiates the CustomerCommandHandler
class.
Thee consumerCommandDispatcher
@Bean
instantiates a CommandDispatcher
using the the SagaCommandDispatcherFactory
@Bean
.
The CommandDispatcher
and CommandDispatcher
classes are provided by the Eventuate Tram Saga framework.
The CommandDispatcher
subscribes to the specified channel, dispatches each command message to the appropriate handler and sends back the reply message.
To learn more
- Look at the complete example code in the eventuate-tram-sagas-examples-customers-and-orders Github repository.
- Read my Microservices patterns book, which includes a comprehensive discussion of sagas including the benefits and drawbacks of orchestration-based sagas.
- Read or watch MicroCPH talk on Managing data consistency in a microservice architecture using Sagas (slides, video)
- Talk to me about my microservices consulting and training services.
- Learn more about microservices at adopt.microservices.io