Debezium mongodb connector stop pulling change events after the change of the PRIMARY

Solution Verified - Updated -

Issue

  • Debezium mongodb connector stop pulling change events after the change of the PRIMARY in the ReplicaSet.
  • The debezium mongodb connector shows the following exception in the status:

    {"name":"mongodb-connector","connector":{"state":"RUNNING","worker_id":"10.128.2.57:8083"},"tasks":                                
    [{"id":0,"state":"FAILED","worker_id":"10.128.2.57:8083","trace":"org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:102)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rs0/mongodb-0.mongodb-internal.strimzi.svc.cluster.local:27017,mongodb-1.mongodb-internal.strimzi.svc.cluster.local:27017,mongodb-2.mongodb-internal.strimzi.svc.cluster.local:27017'\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:140)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:282)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:95)\n\t... 5 more\nCaused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream\n\tat com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)\n\tat com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580)\n\tat com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445)\n\tat com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)\n\tat com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259)\n\tat com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)\n\tat com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450)\n\tat com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)\n\tat com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226)\n\tat com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)\n\tat com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:260)\n\tat com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216)\n\tat com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200)\n\tat com.mongodb.client.internal.MongoBatchCursorAdapter.tryNext(MongoBatchCursorAdapter.java:74)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readOplog(MongoDbStreamingChangeEventSource.java:191)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$0(MongoDbStreamingChangeEventSource.java:96)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:278)\n\t... 6 more\n"}],"type":"source"}%
    

Environment

  • Debezium 1.2.0.Final
  • Red Hat AMQ Streams (AMQ Streams) 1.4.0
  • Red Hat OpenShift Container (Openshift) 4

Subscriber exclusive content

A Red Hat subscription provides unlimited access to our knowledgebase, tools, and much more.

Current Customers and Partners

Log in for full access

Log In

New to Red Hat?

Learn more about Red Hat subscriptions

Using a Red Hat product through a public cloud?

How to access this content