Skip to main content
 首页 » 编程设计

Spring Boot集成Java DSL的实现代码

2022年10月05日37me-sa

Spring Integration Java DSL已经融合到Spring Integration Core 5.0,这是一个聪明而明显的举动,因为:

  • 基于Java Config启动新Spring项目的每个人都使用它
  • SI Java DSL使您可以使用Lambdas等新的强大Java 8功能
  • 您可以使用 基于IntegrationFlowBuilder的Builder模式构建流

让我们看看基于ActiveMQ JMS的示例如何使用它。

Maven依赖:

 
<dependencies> 
  <dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-activemq</artifactId> 
  </dependency> 
 
  <dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-core</artifactId> 
  </dependency> 
 
  <dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-jms</artifactId> 
  </dependency> 
 
  <dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-test</artifactId> 
    <scope>test</scope> 
  </dependency> 
 
  <dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-kahadb-store</artifactId> 
  </dependency> 
 
  <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl --> 
  <dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-java-dsl</artifactId> 
    <version>1.2.3.RELEASE</version> 
  </dependency> 
</dependencies>

示例1:Jms入站网关

我们有以下ServiceActivator:

 
@Service 
public class ActiveMQEndpoint { 
  @ServiceActivator(inputChannel = "inboundChannel") 
  public void processMessage(final String inboundPayload) { 
    System.out.println("Inbound message: "+inboundPayload); 
  } 
} 

如果您想使用SI Java DSL 将inboundPayload从Jms队列发送到Gateway风格的激活器,那么请使用DSLJms工厂:

 
@Bean 
public DynamicDestinationResolver dynamicDestinationResolver() { 
  return new DynamicDestinationResolver(); 
} 
 
@Bean 
public ActiveMQConnectionFactory connectionFactory() { 
  return new ActiveMQConnectionFactory(); 
} 
 
@Bean 
public DefaultMessageListenerContainer listenerContainer() { 
  final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); 
  defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); 
  defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); 
  defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test"); 
  return defaultMessageListenerContainer; 
} 
 
@Bean 
public MessageChannel inboundChannel() { 
  return MessageChannels.direct("inboundChannel").get(); 
} 
 
@Bean 
public JmsInboundGateway dataEndpoint() { 
  return Jms.inboundGateway(listenerContainer()) 
      .requestChannel(inboundChannel()).get(); 
} 
 

通过dataEndpoint bean 返回JmsInboundGatewaySpec,您还可以向SI通道或Jms目标发送回复。查看文档。

示例2:Jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的XML JMS配置,那么JmsMessageDrivenChannelAdapter是一种适合您的方式:

 
@Bean 
public DynamicDestinationResolver dynamicDestinationResolver() { 
  return new DynamicDestinationResolver(); 
} 
 
@Bean 
public ActiveMQConnectionFactory connectionFactory() { 
  return new ActiveMQConnectionFactory(); 
} 
 
@Bean 
public DefaultMessageListenerContainer listenerContainer() { 
  final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); 
  defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); 
  defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); 
  defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test"); 
  return defaultMessageListenerContainer; 
} 
 
@Bean 
public MessageChannel inboundChannel() { 
  return MessageChannels.direct("inboundChannel").get(); 
} 
 
@Bean 
public JmsMessageDrivenChannelAdapter dataEndpoint() { 
  final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = 
      new ChannelPublishingJmsMessageListener(); 
  channelPublishingJmsMessageListener.setExpectReply(false); 
  final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new 
      JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener 
      ); 
 
  messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); 
  return messageDrivenChannelAdapter; 
} 
 

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用JAXB的Jms消息驱动的通道适配器

在典型的场景中,您希望通过Jms接受XML作为文本消息,将其转换为JAXB存根并在服务激活器中处理它。我将向您展示如何使用SI Java DSL执行此操作,但首先让我们为xml处理添加两个依赖项:

 
<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-xml</artifactId> 
  </dependency> 
 
  <dependency> 
    <groupId>org.springframework</groupId> 
    <artifactId>spring-oxm</artifactId> 
  </dependency> 
 

我们将通过JMS接受shiporders ,所以首先XSD命名为shiporder.xsd:

 
<?xml version="1.0" encoding="UTF-8" ?> 
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> 
 
  <xs:element name="shiporder"> 
    <xs:complexType> 
      <xs:sequence> 
        <xs:element name="orderperson" type="xs:string"/> 
        <xs:element name="shipto"> 
          <xs:complexType> 
            <xs:sequence> 
              <xs:element name="name" type="xs:string"/> 
              <xs:element name="address" type="xs:string"/> 
              <xs:element name="city" type="xs:string"/> 
              <xs:element name="country" type="xs:string"/> 
            </xs:sequence> 
          </xs:complexType> 
        </xs:element> 
        <xs:element name="item" maxOccurs="unbounded"> 
          <xs:complexType> 
            <xs:sequence> 
              <xs:element name="title" type="xs:string"/> 
              <xs:element name="note" type="xs:string" minOccurs="0"/> 
              <xs:element name="quantity" type="xs:positiveInteger"/> 
              <xs:element name="price" type="xs:decimal"/> 
            </xs:sequence> 
          </xs:complexType> 
        </xs:element> 
      </xs:sequence> 
      <xs:attribute name="orderid" type="xs:string" use="required"/> 
    </xs:complexType> 
  </xs:element> 
 
</xs:schema> 
 

新增JAXB maven plugin 生成JAXB存根:

 
 <plugin> 
      <groupId>org.codehaus.mojo</groupId> 
      <artifactId>jaxb2-maven-plugin</artifactId> 
      <version>2.3.1</version> 
      <executions> 
        <execution> 
          <id>xjc-schema1</id> 
          <goals> 
            <goal>xjc</goal> 
          </goals> 
          <configuration> 
            <!-- Use all XSDs under the west directory for sources here. --> 
            <sources> 
              <source>src/main/resources/xsds/shiporder.xsd</source> 
            </sources> 
 
            <!-- Package name of the generated sources. --> 
            <packageName>com.example.stubs</packageName> 
            <outputDirectory>src/main/java</outputDirectory> 
            <clearOutputDir>false</clearOutputDir> 
          </configuration> 
        </execution> 
      </executions> 
    </plugin>

我们已经准备好了存根类和一切,现在使用Jaxb magic的Java DSL JMS消息驱动适配器:

 
/** 
 * Sample 3: Jms message driven adapter with JAXB 
 */ 
@Bean 
public JmsMessageDrivenChannelAdapter dataEndpoint() { 
  final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = 
      new ChannelPublishingJmsMessageListener(); 
  channelPublishingJmsMessageListener.setExpectReply(false); 
  channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller())); 
  final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new 
      JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener 
  ); 
 
  messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); 
  return messageDrivenChannelAdapter; 
} 
 
@Bean 
public Jaxb2Marshaller shipOrdersMarshaller() { 
  Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
  marshaller.setContextPath("com.example.stubs"); 
  return marshaller; 
}

XML配置在Java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundChannel的服务激活器将如下所示:

 
/** 
 * Sample 3 
 * @param shiporder 
 */ 
@ServiceActivator(inputChannel = "inboundChannel") 
public void processMessage(final Shiporder shiporder) { 
  System.out.println(shiporder.getOrderid()); 
  System.out.println(shiporder.getOrderperson()); 
} 

要测试流,您可以使用以下XML通过JConsole发送到JMS队列:

 
 <?xml version="1.0" encoding="UTF-8"?>     
  <shiporder orderid="889923" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:noNamespaceSchemaLocation="shiporder.xsd"> 
   <orderperson>John Smith</orderperson> 
    <shipto> 
     <name>Ola Nordmann</name> 
     <address>Langgt 23</address> 
     <city>4000 Stavanger</city> 
     <country>Norway</country> 
    </shipto> 
    <item> 
     <title>Empire Burlesque</title> 
     <note>Special Edition</note> 
     <quantity>1</quantity> 
     <price>10.90</price> 
     </item> 
    <item> 
     <title>Hide your heart</title> 
     <quantity>1</quantity> 
     <price>9.90</price> 
    </item> 
  </shiporder>

示例4:具有JAXB和有效负载根路由的Jms消息驱动的通道适配器

另一种典型情况是接受XML作为JMS文本消息,将其转换为JAXB存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然SI Java DSL支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下XSD添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

 
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" 
      xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd" 
      targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd" 
      elementFormDefault="qualified"> 
  <xsd:element name="PurchaseOrder"> 
    <xsd:complexType> 
      <xsd:sequence> 
        <xsd:element name="ShipTo" type="tns:USAddress" maxOccurs="2"/> 
        <xsd:element name="BillTo" type="tns:USAddress"/> 
      </xsd:sequence> 
      <xsd:attribute name="OrderDate" type="xsd:date"/> 
    </xsd:complexType> 
  </xsd:element> 
 
  <xsd:complexType name="USAddress"> 
    <xsd:sequence> 
      <xsd:element name="name"  type="xsd:string"/> 
      <xsd:element name="street" type="xsd:string"/> 
      <xsd:element name="city"  type="xsd:string"/> 
      <xsd:element name="state" type="xsd:string"/> 
      <xsd:element name="zip"  type="xsd:integer"/> 
    </xsd:sequence> 
    <xsd:attribute name="country" type="xsd:NMTOKEN" fixed="US"/> 
  </xsd:complexType> 
</xsd:schema> 
 

然后添加到jaxb maven插件配置:

 
 <plugin> 
      <groupId>org.codehaus.mojo</groupId> 
      <artifactId>jaxb2-maven-plugin</artifactId> 
      <version>2.3.1</version> 
      <executions> 
        <execution> 
          <id>xjc-schema1</id> 
          <goals> 
            <goal>xjc</goal> 
          </goals> 
          <configuration> 
            <!-- Use all XSDs under the west directory for sources here. --> 
            <sources> 
              <source>src/main/resources/xsds/shiporder.xsd</source> 
              <source>src/main/resources/xsds/purchaseorder.xsd</source> 
            </sources> 
 
            <!-- Package name of the generated sources. --> 
            <packageName>com.example.stubs</packageName> 
            <outputDirectory>src/main/java</outputDirectory> 
            <clearOutputDir>false</clearOutputDir> 
          </configuration> 
        </execution> 
      </executions> 
    </plugin>

运行mvn clean install以生成新XSD的JAXB存根。现在承诺有效负载根映射:

 
@Bean 
public Jaxb2Marshaller ordersMarshaller() { 
  Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
  marshaller.setContextPath("com.example.stubs"); 
  return marshaller; 
} 
 
/** 
 * Sample 4: Jms message driven adapter with Jaxb and Payload routing. 
 * @return 
 */ 
@Bean 
public JmsMessageDrivenChannelAdapter dataEndpoint() { 
  final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = 
      new ChannelPublishingJmsMessageListener(); 
  channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(ordersMarshaller())); 
  final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new 
      JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener 
  ); 
 
  messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); 
  return messageDrivenChannelAdapter; 
} 
 
@Bean 
public IntegrationFlow payloadRootMapping() { 
  return IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m 
      .subFlowMapping(Shiporder.class, sf->sf.handle((MessageHandler) message -> { 
        final Shiporder shiporder = (Shiporder) message.getPayload(); 
        System.out.println(shiporder.getOrderperson()); 
        System.out.println(shiporder.getOrderid()); 
      })) 
      .subFlowMapping(PurchaseOrder.class, sf->sf.handle((MessageHandler) message -> { 
        final PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload(); 
        System.out.println(purchaseOrderType.getBillTo().getName()); 
      })) 
  ).get(); 
} 
 

注意payloadRootMapping bean,让我们解释一下重要的部分:

  • <Object, Class<?>> route - 表示来自inboundChannel的输入将是Object,并且将根据Class <?>执行路由
  • subFlowMapping(Shiporder.class.. - ShipOders的处理。
  • subFlowMapping(PurchaseOrder.class ... - 处理PurchaseOrders。

要测试ShipOrder有效负载,请使用示例3中的XML,以测试PurchaseOrder有效负载,使用以下XML:

 
<?xml version="1.0" encoding="utf-8"?>  
<PurchaseOrder OrderDate="1900-01-01" xmlns="http://tempuri.org/PurchaseOrderSchema.xsd">  
 <ShipTo country="US">  
  <name>name1</name>  
  <street>street1</street>  
  <city>city1</city>  
  <state>state1</state>  
  <zip>1</zip>  
 </ShipTo>  
 <ShipTo country="US">  
  <name>name2</name>  
  <street>street2</street>  
  <city>city2</city>  
  <state>state2</state>  
  <zip>-79228162514264337593543950335</zip>  
 </ShipTo>  
 <BillTo country="US">  
  <name>name1</name>  
  <street>street1</street>  
  <city>city1</city>  
  <state>state1</state>  
  <zip>1</zip>  
 </BillTo>  
</PurchaseOrder>

应根据subflow 子流Map路由两个有效载荷。

示例5:IntegrationFlowAdapter

除了企业集成模式的其他实现(check them out)),我需要提到IntegrationFlowAdapter。通过扩展此类并实现buildFlow方法,如:

 
[url=https://bitbucket.org/Component/]@Component[/url]  
public class MyFlowAdapter extends IntegrationFlowAdapter { 
 
@Autowired 
 private ConnectionFactory rabbitConnectionFactory; 
 
 @Override 
 protected IntegrationFlowDefinition<?> buildFlow() { 
   return from(Amqp.inboundAdapter(this.rabbitConnectionFactory, "myQueue")) 
        .<String, String>transform(String::toLowerCase) 
        .channel(c -> c.queue("myFlowAdapterOutput")); 
 } 
 

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有JmsEndpoints定义基类,并在其中定义重复bean:

 
public class JmsEndpoint extends IntegrationFlowAdapter { 
 
  private String queueName; 
 
  private String channelName; 
 
  private String contextPath; 
 
  /** 
   * @param queueName 
   * @param channelName 
   * @param contextPath 
   */ 
  public JmsEndpoint(String queueName, String channelName, String contextPath) { 
    this.queueName = queueName; 
    this.channelName = channelName; 
    this.contextPath = contextPath; 
  } 
 
  @Override 
  protected IntegrationFlowDefinition<?> buildFlow() { 
    return from(Jms.messageDrivenChannelAdapter(listenerContainer()) 
      .jmsMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller())) 
    ).channel(channelName); 
  } 
 
  @Bean 
  public Jaxb2Marshaller shipOrdersMarshaller() { 
    Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
    marshaller.setContextPath(contextPath); 
    return marshaller; 
  } 
 
  @Bean 
  public DynamicDestinationResolver dynamicDestinationResolver() { 
    return new DynamicDestinationResolver(); 
  } 
 
  @Bean 
  public ActiveMQConnectionFactory connectionFactory() { 
    return new ActiveMQConnectionFactory(); 
  } 
 
  @Bean 
  public DefaultMessageListenerContainer listenerContainer() { 
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); 
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); 
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); 
    defaultMessageListenerContainer.setDestinationName(queueName); 
    return defaultMessageListenerContainer; 
  } 
 
  @Bean 
  public MessageChannel inboundChannel() { 
    return MessageChannels.direct(channelName).get(); 
  } 
} 
 

现在声明特定队列的Jms端点很容易:

 
@Bean 
public JmsEndpoint jmsEndpoint() { 
  return new JmsEndpoint("jms.activeMQ.Test", "inboundChannel", "com.example.stubs"); 
} 

inboundChannel的服务激活器:

 
/** 
 * Sample 3, 5 
 * @param shiporder 
 */ 
@ServiceActivator(inputChannel = "inboundChannel") 
public void processMessage(final Shiporder shiporder) { 
  System.out.println(shiporder.getOrderid()); 
  System.out.println(shiporder.getOrderperson()); 
} 

您不应该错过在项目中使用IntegrationFlowAdapter。我喜欢它的概念。

我最近在Embedit的新的基于Spring Boot的项目中开始使用Spring Integration Java DSL 。即使有一些配置,我发现它非常有用。

  • 它很容易调试。不添加像wiretap这样的配置。
  • 阅读起来要容易得多。是的,即使是lambdas!
  • 它很强大。在Java配置中,您现在有很多选择。

源码地址:https://bitbucket.org/tomask79/spring-integration-java-dsl 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持亿速云。


本文参考链接:https://www.yisu.com/zixun/204534.html