问题描述
对于多播 + 聚合,我有以下奇怪的(或至少我不清楚)行为.考虑以下路线:
I have the following strange (or at least unclear to me) behaviour for a multi-cast + aggregation. Consider the following route:
from("direct:multicaster") .multicast() .to("direct:A", "direct:B") .aggregationStrategy(new AggregationStrategy() { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { List firstResult = newExchange.getIn().getBody(List.class); newExchange.getIn().setBody(ImmutableList.copyOf(firstResult)); return newExchange; } else { List oldResults = oldExchange.getIn().getBody(List.class); List newResults = newExchange.getIn().getBody(List.class); ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults)); oldExchange.getIn().setBody(aggResult); return oldExchange; } } }) .end() // .to("log:bla")
本质上,该路由接受一个输入,将其发送到 direct:A 和 direct:B,从这两个端点获取列表并将它们连接起来(最后一行是有原因的,我稍后会解释).
Essentially, this route takes an input, sends it to direct:A and direct:B, expects lists from these two endpoints and concatenates them (the comment in the last line is there for a reason I will explain later).
现在假设这两个端点返回".列表 [A] 和 [B] 分别.如果我将消息 M 发送到 direct:multicaster,则使用 oldExchange = null 和 newExchange.in 调用一次聚合器.body=[A],然后是 oldExchange.in.body=[A] 和 newExchange.out.body=[B]做).
Now assume that these two endpoints "return" the lists [A] and [B], respectively. If I send the message M to direct:multicaster, then the aggregator is called once with oldExchange = null and newExchange.in.body=[A], then with oldExchange.in.body=[A] and newExchange.out.body=[B] (as it is supposed to do).
到目前为止一切顺利.但是再次使用 oldExchange.in.body=[A,B] 和 newExchange.in=M 调用聚合器(M 是初始消息).这看起来类似于包含的扩充模式.
All good up to this point. But the aggregator is called once more with oldExchange.in.body=[A,B] and newExchange.in=M (M is the initial message). This looks similar to an included enrichment pattern.
您可以通过删除最后一行中的注释来获得预期的行为,即只需添加一个虚拟 to("log:bla").有了这个,一切都按预期运行.
You can get the expected behaviour by removing the comment in the last line, i.e. simply adding a dummy to("log:bla"). With this everthing behaves as expected.
.multicast() .aggregationStrategy(aggStrategy) .to("direct:A", "direct:B") .end()
和
.multicast(aggStrategy) .to("direct:A", "direct:B") .end()
两者都导致相同的行为.
both result in the same behaviour.
这里发生了什么 - 我做错了什么?
What is happening here - what did I get wrong?
提前致谢标记
推荐答案
我尝试重现该问题,但没有成功.这就是我所做的:
I have tried to reproduce the problem, but without success. This is what I did:
路线:
public class MulticastRoute extends RouteBuilder { @Override public void configure() throws Exception { AggregationStrategy myAggregationStrategy = new MyAggregationStrategy(); List<String> listA = Lists.newArrayList("A"); List<String> listB = Lists.newArrayList("B"); from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end(); from("direct:A").setBody(constant(listA)); from("direct:B").setBody(constant(listB)); } class MyAggregationStrategy implements AggregationStrategy { @Override public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) { System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" : oldExchange.getIn().getBody().toString()) + ", newExchange = " + newExchange.getIn().getBody().toString()); return newExchange; } } }
创建了一个简单的测试来运行路由.
Created a simple test just to run the route.
测试:
public class MulticastRouteTest extends CamelTestSupport { @Test public void testMulticastRoute() throws Exception { context.addRoutes(new MulticastRoute()); template.sendBody("direct:multicast", null); } }
打印出来:
Aggregate called with oldExchange = null, newExchange = [A] Aggregate called with oldExchange = [A], newExchange = [B]
这是我们所期望的.希望这会帮助你.我看不出我做事的方式有什么不同,但希望你能发现.
This is what we would expect. Hope this will help you. I can not see any difference in the way I do things, but hopefully you will spot it.