Spring Cloud - 使用 Apache Kafka 的流
介绍
在分布式环境中,服务需要相互通信。通信可以同步发生,也可以异步发生。在本节中,我们将了解服务如何通过使用**消息代理**异步通信。
执行异步通信的两个主要好处:
**生产者和消费者的速度可以不同** - 如果数据的消费者速度慢或快,则不会影响生产者的处理,反之亦然。两者都可以以各自的速度工作,而不会相互影响。
**生产者不需要处理来自各种消费者的请求** - 可能有多个消费者希望从生产者读取相同的数据集。通过在两者之间使用消息代理,生产者不需要处理这些消费者产生的负载。此外,生产者级别的任何中断都不会阻止消费者读取旧的生产者数据,因为这些数据将存在于消息代理中。
**Apache Kafka** 和**RabbitMQ** 是两种用于进行异步通信的知名消息代理。在本教程中,我们将使用 Apache Kafka。
Kafka – 依赖项设置
让我们使用我们之前一直在使用的餐厅案例。因此,假设我们的客户服务和餐厅服务通过异步通信进行通信。为此,我们将使用 Apache Kafka。并且我们需要在两个服务中使用它,即客户服务和餐厅服务。
要使用 Apache Kafka,我们将更新这两个服务的 POM 并添加以下依赖项。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
我们还需要运行 Kafka 实例。有多种方法可以做到这一点,但我们更倾向于使用 Docker 容器启动 Kafka。以下是一些我们可以考虑使用的镜像:
无论我们使用哪个镜像,这里需要注意的重要一点是,一旦镜像启动并运行,请确保 Kafka 集群可在**localhost:9092**访问。
现在我们已经在镜像上运行了 Kafka 集群,让我们转到核心示例。
绑定和绑定器
在 Spring Cloud Streams 中,有三个重要的概念:
**外部消息系统** - 这是外部管理的组件,负责存储应用程序生成的事件/消息,这些事件/消息可以被其订阅者/消费者读取。请注意,这不是在应用程序/Spring 中管理的。一些示例包括 Apache Kafka、RabbitMQ
**绑定器** - 这是提供与消息系统集成的组件,例如,包含消息系统的 IP 地址、身份验证等。
**绑定** - 此组件使用绑定器向消息系统发送消息或从特定主题/队列中使用消息。
所有上述属性都在**应用程序属性文件**中定义。
示例
让我们使用我们之前一直在使用的餐厅案例。因此,假设每当向客户服务添加新服务时,我们都希望通知附近的餐厅有关他/她的客户信息。
为此,让我们首先更新我们的客户服务以包含和使用 Kafka。请注意,我们将使用客户服务作为数据的生产者。也就是说,每当我们通过 API 添加客户时,它也将添加到 Kafka 中。
spring:
application:
name: customer-service
cloud:
stream:
source: customerBinding-out-0
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-out-0:
destination: customer
producer:
partitionCount: 3
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
**需要注意的事项**:
我们已定义了一个绑定器,其中包含我们本地 Kafka 实例的地址。
我们还定义了绑定“customerBinding-out-0”,它使用“customer”主题输出消息。
我们还在**stream.source**中提到了我们的绑定,以便我们可以在代码中使用它。
完成此操作后,让我们现在通过添加一个负责处理 POST 请求的新方法“addCustomer”来更新我们的控制器。然后,从**post**请求中,我们将数据发送到 Kafka 代理。
package com.tutorialspoint;
import java.util.HashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantCustomerInstancesController {
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Customer> mockCustomerData = new HashMap();
static{
mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
}
@RequestMapping("/customer/{id}")
public Customer getCustomerInfo(@PathVariable("id") Long id) {
System.out.println("Querying customer for id with: " + id);
return mockCustomerData.get(id);
}
@RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
public Customer addCustomer(@PathVariable("id") Long id) {
// add default name
Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
streamBridge.send("customerBinding-out-0", defaultCustomer);
return defaultCustomer;
}
}
需要注意的事项
我们正在自动装配 StreamBridge,这就是我们将用于发送消息的工具。
我们在“send”方法中使用的参数也指定了我们要用来发送数据的绑定。
现在让我们更新我们的餐厅服务以包含并订阅“customer”主题。请注意,我们将使用餐厅服务作为数据的消费者。也就是说,每当我们通过 API 添加客户时,餐厅服务都会通过 Kafka 了解它。
首先,让我们更新**application.properties**文件。
spring:
application:
name: restaurant-service
cloud:
function:
definition: customerBinding
stream:
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-in-0:
destination: customer
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
完成此操作后,让我们现在通过添加一个负责获取请求并提供一个函数的新方法“customerBinding”来更新我们的控制器,该函数将打印请求及其元数据详细信息。
package com.tutorialspoint;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantController {
@Autowired
CustomerService customerService;
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();
static{
mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
}
@RequestMapping("/restaurant/customer/{id}")
public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
System.out.println("Got request for customer with id: " + id);
String customerCity = customerService.getCustomerById(id).getCity();
return mockRestaurantData.entrySet().stream().filter(
entry -> entry.getValue().getCity().equals(customerCity))
.map(entry -> entry.getValue())
.collect(Collectors.toList());
}
@RequestMapping("/restaurant/cust/{id}")
public void getRestaurantForCust(@PathVariable("id") Long id) {
streamBridge.send("ordersBinding-out-0", id);
}
@Bean
public Consumer<Message<Customer>> customerBinding() {
return msg -> {
System.out.println(msg);
};
}
}
**需要注意的事项**:
我们正在使用“customerBinding”,它应该传递当此绑定收到消息时将调用的函数。
我们为此函数/bean使用的名称也需要在创建捆绑和指定主题时在 YAML 文件中使用。
现在,让我们像往常一样执行上述代码,启动 Eureka Server。请注意,这不是硬性要求,这里是为了完整性而存在的。
然后,让我们编译并开始使用以下命令更新客户服务:
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然后,让我们编译并开始使用以下命令更新餐厅服务:
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我们已设置好了,现在让我们通过访问 API 来测试我们的代码段:
curl -X POST https://:8083/customer/1
这是我们将为此 API 获得的输出:
{
"id": 1,
"name": "Dwayne",
"city": "NY"
}
现在,让我们检查餐厅服务的日志:
GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
headers={kafka_offset=1,...
因此,实际上,您可以看到,使用 Kafka 代理,餐厅服务已收到有关新添加的客户的通知。
分区和消费者组
分区和消费者组是您在使用 Spring Cloud Streams 时应该了解的两个重要概念。
**分区** - 用于对数据进行分区,以便我们可以在多个消费者之间分配工作。
让我们看看如何在 Spring Cloud 中对数据进行分区。假设,我们希望根据客户 ID 对数据进行分区。因此,让我们为此更新我们的客户服务。为此,我们需要告诉
让我们更新我们的客户服务应用程序属性以指定我们数据的键。
spring:
application:
name: customer-service
cloud:
function:
definition: ordersBinding
stream:
source: customerBinding-out-0
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-out-0:
destination: customer
producer:
partitionKeyExpression: 'getPayload().getId()'
partitionCount: 3
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
为了指定键,即“partitionKeyExpression”,我们提供了 Spring 表达式语言。该表达式假设类型为 GenericMessage
现在,让我们也更新我们的消费者,即餐厅服务,以便在使用请求时记录更多信息。
现在,让我们像往常一样执行上述代码,启动 Eureka Server。请注意,这不是硬性要求,这里是为了完整性而存在的。
然后,让我们编译并开始使用以下命令更新客户服务:
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然后,让我们编译并开始使用以下命令更新餐厅服务:
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我们已设置好了,现在让我们测试我们的代码段。作为测试的一部分,我们将执行以下操作:
插入 ID 为 1 的客户:curl -X POST https://:8083/customer/1
插入 ID 为 1 的客户:curl -X POST https://:8083/customer/1
插入 ID 为 1 的客户:curl -X POST https://:8083/customer/5
插入 ID 为 1 的客户:curl -X POST https://:8083/customer/3
插入 ID 为 1 的客户:curl -X POST https://:8083/customer/1
我们不太关心 API 的输出。相反,我们更关心数据发送到的分区。由于我们使用客户 ID 作为键,因此我们预计具有相同 ID 的客户将最终位于同一分区中。
现在,让我们检查餐厅服务的日志:
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 2 Customer: Customer [id=5, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 0 Customer: Customer [id=3, name=Dwayne, city=NY] Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我们所看到的,ID 为 1 的客户每次都最终位于同一分区中,即分区 1。
**消费者组** - 消费者组是读取同一主题以实现相同目的的消费者的逻辑分组。主题中的数据在消费者组中的消费者之间进行分区,以便给定消费者组中的只有一个消费者可以读取主题的一个分区。
要定义消费者组,我们只需在我们使用 Kafka 主题名称的绑定中定义一个组即可。例如,让我们在我们的应用程序文件中为我们的控制器定义消费者组名称。
spring:
application:
name: restaurant-service
cloud:
function:
definition: customerBinding
stream:
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-in-0:
destination: customer
group: restController
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
让我们重新编译并启动餐厅服务。现在,让我们通过访问客户服务的 POST API 生成事件:
插入 ID 为 1 的客户:curl -X POST https://:8083/customer/1
现在,如果我们检查餐厅服务的日志,我们将看到以下内容:
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: restContoller Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我们从输出中看到的,我们创建了一个名为“rest-contoller”的消费者组,其消费者负责读取主题。在上述情况下,我们只有一个服务实例正在运行,因此“customer”主题的所有分区都被分配给同一个实例。但是,如果我们有多个分区,我们将有多个分区分布在工作程序之间。