Spring IntegrationでBroadcedDispatcherに加入者を接続できません -- java-8 フィールド と spring-integration フィールド と spring-integration-dsl フィールド と spring-cloud-gcp フィールド 関連 問題

Unable to attach subscriber to BroadCastingDispatcher in Spring Integration












1
vote

問題

日本語

Butiful GCP:Google Cloud Pub / sub(4/8)との統合は、Google Pubsubの購読から読み取られ、別のトピックに書き込むフローを構築しようとしていました。

DEBUG Modeでアプリケーションを起動した後、メッセージが Google PubSub から到着していることがわかりますが、この

のために「消費」されていません。

o.s.i.Dispatcher.BroadcastingDispatcher:購読者なし、デフォルトの動作は無視

これに関する助けが大歓迎です。

以下の主要なコードのように見える方法です -

<事前> <コード> public class PubsubRouteBuilderService { private final PubSubTemplate pubSubTemplate; // injected via Spring public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) { this.pubSubTemplate = pubSubTemplate; } public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) { log.info("Building route for: {}", pubsubRouteModel); buildPubsubRoute(pubsubRouteModel); // some unrelated logic return true; } private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) { final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from( RouteBuilderFactory .messageChannelAdapter( RouteBuilderFactory.getMessageChannel(), pubSubTemplate, pubsubRouteModel.getFromSub())) .handle( message -> { log.info("consumed new message: [" + message.getPayload() + "]"); AckReplyConsumer consumer = message.getHeaders() .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class); consumer.ack(); }) .get(); standardIntegrationFlow.start(); } }

次のように<コード> RouteBuilderFactory からの他の方法です。 <事前> <コード> public static MessageChannel getMessageChannel() { return MessageChannels.publishSubscribe().get(); } public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) { PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName); adapter.setOutputChannel(inputChannel); adapter.setAckMode(AckMode.MANUAL); return adapter; }

英語

Following this example at Bootiful GCP: Integration with Google Cloud Pub/Sub (4/8), I was trying to build a flow where I read from Google Pubsub subscription and write to another topic.

After starting my application on DEBUG mode, I can see that the messages are arriving from Google PubSub but they are not getting "consumed" because of this

o.s.i.dispatcher.BroadcastingDispatcher : No subscribers, default behavior is ignore

Any help on this is much appreciated.

Following is how my major code looks like-

public class PubsubRouteBuilderService {      private final PubSubTemplate pubSubTemplate; // injected via Spring      public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) {         this.pubSubTemplate = pubSubTemplate;     }      public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) {         log.info("Building route for: {}", pubsubRouteModel);         buildPubsubRoute(pubsubRouteModel);         // some unrelated logic         return true;     }      private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {          final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(             RouteBuilderFactory                     .messageChannelAdapter(                             RouteBuilderFactory.getMessageChannel(),                             pubSubTemplate,                             pubsubRouteModel.getFromSub()))             .handle(                     message -> {                         log.info("consumed new message: [" + message.getPayload() + "]");                         AckReplyConsumer consumer = message.getHeaders()                                 .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);                         consumer.ack();                     })             .get();          standardIntegrationFlow.start();     } } 

Here are the other methods from RouteBuilderFactory as follows-

public static MessageChannel getMessageChannel() {     return MessageChannels.publishSubscribe().get(); }  public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) {     PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName);     adapter.setOutputChannel(inputChannel);     adapter.setAckMode(AckMode.MANUAL);     return adapter; } 
</div
           

回答リスト

0
 
vote
vote
ベストアンサー
 

あなたのコードはそのブログ投稿に基づいていないようです...

 <コード> private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {      final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(         RouteBuilderFactory                 .messageChannelAdapter(                         RouteBuilderFactory.getMessageChannel(),                         pubSubTemplate,                         pubsubRouteModel.getFromSub()))         .handle(                 message -> {                     log.info("consumed new message: [" + message.getPayload() + "]");                     AckReplyConsumer consumer = message.getHeaders()                             .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);                     consumer.ack();                 })         .get();      standardIntegrationFlow.start(); }   

任意の IntegrationFlow Object - Springによって管理する必要があります(<コード> len(ready)0 として宣言されています)。

この作業をすべて行うために枠組みがシーンの後ろに建設するいくつかのインフラストラクチャがあります。

 

Your code doesn't seem to be based on that blog post at all...

private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {      final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(         RouteBuilderFactory                 .messageChannelAdapter(                         RouteBuilderFactory.getMessageChannel(),                         pubSubTemplate,                         pubsubRouteModel.getFromSub()))         .handle(                 message -> {                     log.info("consumed new message: [" + message.getPayload() + "]");                     AckReplyConsumer consumer = message.getHeaders()                             .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);                     consumer.ack();                 })         .get();      standardIntegrationFlow.start(); } 

You can't just "start" some arbitrary IntegrationFlow object - it must be managed by Spring (declared as a @Bean).

There's some infrastructure that the framework builds behind the scenes to make all this work.

</div
 
 
     
     

関連する質問

0  Spring Boot:各リクエストの特定のトピックにメッセージをプッシュする  ( Spring boot push message to specific topic for each request ) 
スプリングブーツとのPUBサブ統合を使用しています。 <事前> <コード> @Configuration public class PubSubConfiguration { @Value("${spring.pubsub.topic.name}")...

1  Google Cloud SQLからPG_CRONを使用する方法はありますか?  ( Is there way to use pg cron from the google cloud sql ) 
現在、データベースをPostgreSQLからGoogle Cloud SQLに移動する必要があります。 pg_cronを使ってこのような古いレコードを削除します。 <事前> <コード> SELECT cron.schedule('30 3 * * 6', $$...

1  Spring IntegrationでBroadcedDispatcherに加入者を接続できません  ( Unable to attach subscriber to broadcastingdispatcher in spring integration ) 
Butiful GCP:Google Cloud Pub / sub(4/8)との統合は、Google Pubsubの購読から読み取られ、別のトピックに書き込むフローを構築しようとしていました。 DEBUG Modeでアプリケーションを起動した後、メ...

0  GoogleデータストアエンティティはIDを返しません。エンティティIDはNULLです  ( Google datastore entity does not return id entity id is null ) 
私はデータ型エンティティから延長される製品エンティティを持っています。このように: @ <事前> <コード> Entity(name = "Product") @Getter @Setter public class ProductEntity exten...

0  Spring Cloud GCP DataStoreReeRepository autoconfigまたは注釈なしでintialize  ( Spring cloud gcp datastorerepository intialize without autoconfig or annotations ) 
私は GCPデータストアサンプルが手動で初期化したいです。 SimpleJparePository 他のSOである。 DataStoreRepositoryを初期化することは可能ですか? JPAリポジトリを使用した例を簡単にすることはできます。 Empl...

0  クラウドSQLへの接続は失敗しています  ( Connecting to cloud sql is failing ) 
クラウドエディタの役割を持つサービスアカウントを使用してGCP Cloud SQLに接続しようとしています。ここに与えられた例に続いています href = "https://github.com/spring-cloud/spring-cloud-gcp/t...

0  クラスタから各ノードがGoogle Pub / Subトピックからメッセージを受け付けることを実現する方法  ( How to achieve that each node from the cluster accepts message from the google p ) 
TGEサブスクリプションからメッセージを聴くクラスタアプリケーションを持っています だから私は1つのトピックと1つの概要を持っています。次のスキーマはロードバランサとして機能するため、クラスタ内の各ノードは一意のメッセージを受け入れます。こののように Pr...

6  メッセージはGoogle Pub / Subサブスクリプションから再び認識した後に再び受信されます[Heisenbug]  ( Message is received from google pub sub subscription again and again after ackno ) 
私が説明するシナリーは十分に稀であり、ほとんどの場合、すべてが期待どおりに機能することに気付きたいです。 PUB / SUB側に1つのトピックと1個のサブスクリプションがあります。 サブスクリプションのための私のJavaアプリケーションListensは、何...

2  アプリケーションは、認証後もGoogle Pub / Subからの重複メッセージを受け入れます  ( Application accepts duplicated messages from google pub sub even after acknowled ) 
GCP環境でアプリケーションをテストするとき、非常に奇妙なフローティングバグ()を経験します。私は再現するための具体的なステップを見つけることができませんが、それは本当に時々起こります。 メッセージが正常に認められたことがわかりました。 <事前> <コード>...

3  Spring GCPで複数のGoogle Pubsubプロジェクトを購読する方法  ( How to subscribe to multiple google pubsub projects in spring gcp ) 
Spring Bootアプリケーションで複数のGoogle Cloud Pubsubプロジェクトを購読したいです。 スプリングクラウド付きの1スプリングブートアプリケーションで2つのPUBSUB GCPプロジェクトを配線/構成する方法?複数のGoogleプロ...




© 2022 cndgn.com All Rights Reserved. Q&Aハウス 全著作権所有