{"id":257379,"date":"2022-12-08T08:45:27","date_gmt":"2022-12-08T00:45:27","guid":{"rendered":"https:\/\/gulass.cn\/?p=257379"},"modified":"2022-11-14T22:46:21","modified_gmt":"2022-11-14T14:46:21","slug":"retry-kafka-spring","status":"publish","type":"post","link":"https:\/\/gulass.cn\/retry-kafka-spring.html","title":{"rendered":"\u91cd\u8bd5\u673a\u5236\u4e3aKafka\u5e26\u6765\u6625\u5929"},"content":{"rendered":"\n\n\n
\u5bfc\u8bfb<\/td>\n\u6700\u8fd1\u4e1a\u52a1\u4e0a\u7528\u5230\u4e86Spring Kafka\uff0c\u6240\u4ee5\u7cfb\u7edf\u6027\u7684\u63a2\u7d22\u4e86\u4e0bSpring Kafka\u7684\u5404\u79cd\u7528\u6cd5\uff0c\u53d1\u73b0\u4e86\u5f88\u591a\u5b9e\u7528\u7684\u7279\u6027\uff0c\u4e0b\u9762\u4ecb\u7ecd\u4e0bSpring Kafka\u7684\u6d88\u606f\u91cd\u8bd5\u673a\u5236\u3002<\/strong><\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n

\u54c8\u55bd\uff0c\u5927\u5bb6\u597d\uff0c\u6211\u662f\u6307\u5317\u541b\u3002<\/p>\n

\u6700\u8fd1\u4e1a\u52a1\u4e0a\u7528\u5230\u4e86Spring Kafka\uff0c\u6240\u4ee5\u7cfb\u7edf\u6027\u7684\u63a2\u7d22\u4e86\u4e0bSpring Kafka\u7684\u5404\u79cd\u7528\u6cd5\uff0c\u53d1\u73b0\u4e86\u5f88\u591a\u5b9e\u7528\u7684\u7279\u6027\uff0c\u4e0b\u9762\u4ecb\u7ecd\u4e0bSpring Kafka\u7684\u6d88\u606f\u91cd\u8bd5\u673a\u5236\u3002<\/p>\n

0. \u524d\u8a00<\/strong><\/div>\n

\u539f\u751f Kafka \u662f\u4e0d\u652f\u6301\u6d88\u606f\u91cd\u8bd5\u7684\u3002\u4f46\u662f Spring Kafka 2.7+ \u5c01\u88c5\u4e86 Retry Topic \u8fd9\u4e2a\u529f\u80fd\u3002<\/p>\n

1. @RetryableTopic<\/strong><\/div>\n

\u4f7f\u7528\u6ce8\u89e3\u7684\u65b9\u5f0f\u542f\u7528 Retry Topic\uff0c\u5728 @KafkaListener \u65b9\u6cd5\u4e0a\u6dfb\u52a0 @RetryableTopic \u5373\u53ef\uff1a<\/p>\n

\r\n@Slf4j\r\n@Component\r\npublic class DemoConsumer {\r\n    @RetryableTopic\r\n    @KafkaListener(topics = \"topic1\", groupId = \"group1\")\r\n    public void onMsg(ConsumerRecord record){\r\n        log.info(\"topic: {}\", record.topic());\r\n        throw new RuntimeException(\"kafka exception\");\r\n    }\r\n\r\n}<\/pre>\n

\u8fd9\u6837\u5c31\u5f00\u542f\u4e86 Spring Kafka \u7684\u6d88\u606f\u91cd\u8bd5\u673a\u5236\uff1a\u9ed8\u8ba4\u91cd\u8bd5 3 \u6b21\uff0c\u95f4\u9694\u4e3a 1 \u79d2\u3002<\/p>\n

\u6211\u4eec\u5728\u65b9\u6cd5\u91cc\u6a21\u62df\u4e86\u629b\u51fa\u5f02\u5e38\uff0c\u8fd0\u884c\u540e\u53ef\u4ee5\u53d1\u73b0\u6253\u5370\u4e86 3 \u6761\u65e5\u5fd7\uff0c\u95f4\u9694\u65f6\u95f4\u5927\u7ea6\u4e3a 1 \u79d2\uff0c\u91cd\u8bd5\u7684topic\u4e3a\u539ftopic\u52a0\u4e0a\u540e\u7f00\u201c-retry\u201d<\/p>\n

\r\n2022-11-12 12:14:10.230  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1\r\n2022-11-12 12:14:11.315  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-0  \r\n2022-11-12 12:14:12.310  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-1<\/pre>\n
2. DLT\u6b7b\u4fe1\u961f\u5217<\/strong><\/div>\n

\u5982\u679c 3 \u6b21\u91cd\u8bd5\u540e\u4f9d\u65e7\u5931\u8d25\uff0c\u4f1a\u5c06\u6d88\u606f\u53d1\u9001\u5230 DLT\uff0c<\/p>\n

\u9ed8\u8ba4\u60c5\u51b5\uff0c\u6d88\u606f\u88ab\u53d1\u9001\u5230\u6b7b\u4fe1\u961f\u5217\u540e\uff0c\u4f1a\u8f93\u51fa\u4e00\u6761\u65e5\u5fd7\u3002<\/p>\n

\r\n2022-11-12 12:14:13.324  INFO 1023 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: topic1-dlt@233\r\n<\/pre>\n

DLT\u7684topic\u4e3a\u539ftopic\u52a0\u4e0a\u540e\u7f00\u201c-dlt\u201d<\/p>\n

\u6211\u4eec\u53ef\u4ee5\u4f7f\u7528@DltHandler\u6ce8\u89e3\u6765\u5b9a\u4e49\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\u540e\u7684\u64cd\u4f5c\uff1a<\/p>\n

\r\n@DltHandler\r\npublic void dltHandler(ConsumerRecord record){\r\n    log.info(\"topic:{}, key:{}, value:{}\", record.topic(), record.key(), record.value());\r\n}<\/pre>\n
3. \u81ea\u5b9a\u4e49@RetryableTopic<\/strong><\/div>\n

\u53ef\u4ee5\u81ea\u5b9a\u4e49\u91cd\u8bd5\u6b21\u6570\u3001\u5ef6\u8fdf\u65f6\u95f4\u3001topic\u547d\u540d\u7b56\u7565\u7b49\u7b49\uff0c\u652f\u6301\u4f7f\u7528 Spring EL \u8868\u8fbe\u5f0f\u8bfb\u53d6\u914d\u7f6e\u3002<\/p>\n

\r\n@Slf4j\r\n@Component\r\npublic class DemoConsumer {\r\n    @RetryableTopic(\r\n            attempts = \"4\",\r\n            backoff = @Backoff(delay = \"5000\", multiplier = \"2\"),\r\n            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC\r\n    )\r\n    @KafkaListener(topics = \"topic2\", groupId = \"group1\")\r\n    public void onMsg2(ConsumerRecord record){\r\n        log.info(\"topic: {}\", record.topic());\r\n        throw new RuntimeException(\"kafka exception\");\r\n    }\r\n\r\n}<\/pre>\n

\u6ce8\u89e3\u5c5e\u6027\u8bf4\u660e\uff1a<\/p>\n

attempts\uff1a\u91cd\u8bd5\u6b21\u6570\uff0c\u9ed8\u8ba4\u4e3a3\u3002<\/p>\n

@Backoff delay\uff1a\u6d88\u8d39\u5ef6\u8fdf\u65f6\u95f4\uff0c\u5355\u4f4d\u4e3a\u6beb\u79d2\u3002<\/p>\n

@Backoff multiplier\uff1a\u5ef6\u8fdf\u65f6\u95f4\u7cfb\u6570\uff0c\u6b64\u4f8b\u4e2d attempts = 4\uff0c delay = 5000\uff0c multiplier = 2 \uff0c\u5219\u95f4\u9694\u65f6\u95f4\u4f9d\u6b21\u4e3a5s\u300110s\u300120s\u300140s\uff0c\u6700\u5927\u5ef6\u8fdf\u65f6\u95f4\u53d7 maxDelay \u9650\u5236\u3002<\/p>\n

fixedDelayTopicStrategy\uff1a\u53ef\u9009\u7b56\u7565\u5305\u62ec\uff1aSINGLE_TOPIC \u3001MULTIPLE_TOPICS<\/p>\n

4. \u914d\u7f6e\u7c7b<\/strong><\/div>\n

\u4ee5\u4e0a\u4ecb\u7ecd\u7684\u662f\u6ce8\u89e3\u7684\u65b9\u5f0f\uff0c\u53ea\u5bf9\u6ce8\u89e3\u4e0b\u7684\u65b9\u6cd5\u6709\u6548\u3002\u5982\u679c\u60f3\u8ba9\u591a\u4e2a\u65b9\u6cd5\u90fd\u7528\u76f8\u540c\u7684\u6d88\u606f\u91cd\u8bd5\u914d\u7f6e\uff0c\u90a3\u4e48\u53ef\u4ee5\u4f7f\u7528\u914d\u7f6e\u7c7b\u65b9\u5f0f\uff1a<\/p>\n

\r\n@Bean\r\npublic RetryTopicConfiguration retryTopic(KafkaTemplate template){\r\n    return RetryTopicConfigurationBuilder\r\n            .newInstance()\r\n            .maxAttempts(4)\r\n            .fixedBackOff(5000)\r\n            .includeTopic(\"topic1\")\r\n            .create(template);\r\n}<\/pre>\n
\u5c0f\u7ed3<\/strong><\/div>\n

\u4ee5\u4e0a\u5c31\u662fSpring Kafka\u6d88\u606f\u91cd\u8bd5\u673a\u5236\u7684\u7b80\u5355\u5e94\u7528~\u5e0c\u671b\u80fd\u591f\u5e2e\u52a9\u90a3\u4e9b\u6b63\u5728\u4f7f\u7528Spring Kafka\u6216\u5373\u5c06\u4f7f\u7528\u7684\u4eba\u5c11\u8d70\u4e00\u4e9b\u5f2f\u8def\u3001\u5c11\u8e29\u4e00\u70b9\u5751\u3002<\/p>\n","protected":false},"excerpt":{"rendered":"

\u54c8\u55bd\uff0c\u5927\u5bb6\u597d\uff0c\u6211\u662f\u6307\u5317\u541b\u3002 \u6700\u8fd1\u4e1a\u52a1\u4e0a\u7528\u5230\u4e86Spring Kafka\uff0c\u6240\u4ee5\u7cfb\u7edf\u6027\u7684\u63a2\u7d22\u4e86\u4e0bSpring Kaf […]<\/p>\n","protected":false},"author":317,"featured_media":202463,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[55],"tags":[],"class_list":["post-257379","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-thread"],"acf":[],"_links":{"self":[{"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/posts\/257379","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/users\/317"}],"replies":[{"embeddable":true,"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/comments?post=257379"}],"version-history":[{"count":1,"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/posts\/257379\/revisions"}],"predecessor-version":[{"id":257380,"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/posts\/257379\/revisions\/257380"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/media\/202463"}],"wp:attachment":[{"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/media?parent=257379"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/categories?post=257379"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/gulass.cn\/wp-json\/wp\/v2\/tags?post=257379"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}