问题描述
首先有一个类似的未回答问题将路由加入单个聚合器
First there is a similar unanswered question Joining routes into single aggregator
我们有一些消费者路由(ftp、file、smb)从远程系统读取文件.简化了直接路由的测试,但与批处理消费者的行为相似:
We have some consumer routes (ftp, file, smb) reading files from remote systems. Simplified for test with direct route, but similar behavior with batch consumers:
from("direct:"+routeId).id(routeId) .setProperty(AGGREGATION_PROPERTY, constant(routeId)) .log(String.format("Sending (${body}) to %s", "direct:start1")) .to("direct:aggregate");
转换后,一次投票的所有结果将在单独的路由中按批次聚合:
After transformation all results from one poll are aggregated by batch in a separate route:
from("direct:aggregate") .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy()) .completionFromBatchConsumer() .to("log:result", "mock:result");
如果每个消费者分开运行,一切正常.但如果多个消费者并行运行,聚合将拆分民意调查.例如,如果文件消费者轮询 500 条消息,并且第二条路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,1 个来自文件的 500 条消息,1 个来自 ftp 的 6 个消息.
All works fine, if every consumer runs separated. But if multiple consumers runs in parallel, aggregation will split the polls. Example if file-consumer polls 500 messages and a second route starts to read 6 files from ftp the expections is that we get 2 aggregates 1 with 500 messages from file and 1 with 6 messages from ftp.
测试用例:
public void testAggregateByProperty() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z"); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3); template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3); template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2); template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1); template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7); assertMockEndpointsSatisfied(); }
结果是:A+A"、B"、A"、B"、A",而不是预期的A+A+A"、B+B"、A",Z".问题:
The result is: "A+A", "B", "A", "B", "A" and not the expected "A+A+A", "B+B", "A", "Z". Questions:
- 我们对聚合的假设是否错误?
- 我们如何才能实现预期的行为?
- 如果我们设置了completionTimeout,它接缝会从第一次交换发生超时 - 如果还有新的交换,则**?
推荐答案
你几乎可以正常工作了.这是您需要的更改(稍后我会解释).
You almost have it working. Here is the change you need (and after I will explain).
from("direct:aggregate").id("aggregate") .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy()) .completionSize(property(Exchange.BATCH_SIZE)) .to("log:result", "mock:result")
结果将是:
Exchange received, body: A+A+A Exchange received, body: B+B Exchange received, body: A
注意:您不会收到 "Z" 的结果,因为批量大小为 7.
Note: You won't receive a result for the "Z" since the batch size is 7.
解释一下 - 正如您所读到的,聚合器是一个多功能的骆驼组件,正确定义的关键是:
To explain - as you have read, the Aggregator is a versatile camel component and the key things to define correctly are:
- 聚合表达式
- 补全规则
现在,在您的情况下,您正在聚合一个属性 AGGREGATION_PROPERTY,它将是 A、B 或 Z.此外,您正在指定批量大小.
Now in your case you are aggregating on a property AGGREGATION_PROPERTY which will be A, B or Z. In addition you are specifying a batch size.
但是,您没有在路线中表达 completionSize().相反,您使用的是 completionFromBatchConsumer - 它做了一些不同的事情(代码声明它查找 Exchange#BATCH_COMPLETE 属性),因此结果很奇怪.
However you aren't expressing a completionSize() in your route. Instead you were using completionFromBatchConsumer - which does something different (the code states that it looks for a Exchange#BATCH_COMPLETE property), thus the weird results.
无论如何,.completionSize(Exchange.BATCH_SIZE) 将使您的测试按需要运行.
Anyway, .completionSize(Exchange.BATCH_SIZE) will make your test run as desired.
祝你好运.