56. Span 数据作为消息
您可以通过将spring-cloud-sleuth-stream
jar 包含为依赖项来累积和发送 span 数据,并添加 Channel Binder implementation(e.g. spring-cloud-starter-stream-rabbit
用于 RabbitMQ 或spring-cloud-starter-stream-kafka
用于 Kafka)。这将自动将您的应用程序转换为有效负载类型为Spans
的消息的 producer。 spans 将被发送到的 channel name 称为sleuth
。
56.1 Zipkin Consumer
spring-cloud-sleuth-zipkin-stream
已弃用,不应再使用。请使用 OpenZipkin 的 Zipkin 服务器并将环境变量设置为这里是 rabbit(Zipkin 2.4.6)或这里是 kafka(Zipkin 2.4.6)
有一个特殊的便利注释,用于为 Span 数据设置消息 consumer 并将其推送到 Zipkin SpanStore
。这个应用程序
@SpringBootApplication
@EnableZipkinStreamServer
public class Consumer {
public static void main(String[] args) {
SpringApplication.run(Consumer.class, args);
}
}
将通过 Spring Cloud Stream Binder
(e.g. 包含spring-cloud-starter-stream-rabbit
用于 RabbitMQ,并且类似的 starter 存在于 Redis 和 Kafka 中)监听您提供的任何传输的 Span 数据。如果添加以下 UI 依赖项
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-ui</artifactId>
然后你将拥有一个Zipkin 服务器,它在 port 9411 上托管 UI 和 api。
默认SpanStore
是 in-memory(适用于演示和快速入门)。对于更强大的解决方案,您可以将 MySQL 和spring-boot-starter-jdbc
添加到 classpath 并通过 configuration,e.g 启用 JDBC SpanStore
。:
spring:
rabbitmq:
host: ${RABBIT_HOST:localhost}
datasource:
schema: classpath:/mysql.sql
url: jdbc:mysql://${MYSQL_HOST:localhost}/test
username: root
password: root
# Switch this on to create the schema on startup:
initialize: true
continueOnError: true
sleuth:
enabled: false
zipkin:
storage:
type: mysql
@EnableZipkinStreamServer
也用@EnableZipkinServer
注释,因此 process 还将公开标准的 Zipkin 服务器 endpoints,用于通过 HTTP 收集 spans,以及用于在 Zipkin Web UI 中查询。
56.2 自定义 Consumer
使用spring-cloud-sleuth-stream
和 binding 到SleuthSink
也可以轻松实现自定义 consumer。 例:
@EnableBinding(SleuthSink.class)
@SpringBootApplication(exclude = SleuthStreamAutoConfiguration.class)
@MessageEndpoint
public class Consumer {
@ServiceActivator(inputChannel = SleuthSink.INPUT)
public void sink(Spans input) throws Exception {
// ... process spans
}
}
上面的 sample consumer application 明确排除了
SleuthStreamAutoConfiguration
,因此它不会向自己发送消息,但这是可选的(您可能实际上想要将请求跟踪到 consumer 应用程序中)。
在 order 中自定义轮询机制,您可以使用 name 等于StreamSpanReporter.POLLER
创建PollerMetadata
类型的 bean。在这里,您可以找到这种 configuration 的 example。
@Configuration
public static class CustomPollerConfiguration {
@Bean(name = StreamSpanReporter.POLLER)
PollerMetadata customPoller() {
PollerMetadata poller = new PollerMetadata();
poller.setMaxMessagesPerPoll(500);
poller.setTrigger(new PeriodicTrigger(5000L));
return poller;
}
}