Debezium mongodb connector stop pulling change events after the change of the PRIMARY
Issue
- Debezium mongodb connector stop pulling change events after the change of the PRIMARY in the ReplicaSet.
-
The debezium mongodb connector shows the following exception in the status:
{"name":"mongodb-connector","connector":{"state":"RUNNING","worker_id":"10.128.2.57:8083"},"tasks": [{"id":0,"state":"FAILED","worker_id":"10.128.2.57:8083","trace":"org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:102)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to read from oplog on 'rs0/mongodb-0.mongodb-internal.strimzi.svc.cluster.local:27017,mongodb-1.mongodb-internal.strimzi.svc.cluster.local:27017,mongodb-2.mongodb-internal.strimzi.svc.cluster.local:27017'\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbStreamingChangeEventSource.java:140)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:282)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$1(MongoDbStreamingChangeEventSource.java:95)\n\t... 5 more\nCaused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream\n\tat com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)\n\tat com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580)\n\tat com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445)\n\tat com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)\n\tat com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259)\n\tat com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)\n\tat com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450)\n\tat com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)\n\tat com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226)\n\tat com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)\n\tat com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:260)\n\tat com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216)\n\tat com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200)\n\tat com.mongodb.client.internal.MongoBatchCursorAdapter.tryNext(MongoBatchCursorAdapter.java:74)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readOplog(MongoDbStreamingChangeEventSource.java:191)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$null$0(MongoDbStreamingChangeEventSource.java:96)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:278)\n\t... 6 more\n"}],"type":"source"}%
Environment
- Debezium 1.2.0.Final
- Red Hat AMQ Streams (AMQ Streams) 1.4.0
- Red Hat OpenShift Container (Openshift) 4
Subscriber exclusive content
A Red Hat subscription provides unlimited access to our knowledgebase, tools, and much more.