Como replicar Kafkas usando kafkaMirror y los problemas que nos encontramos en el proceso

Replicar kafkas entre varios DCs hacia un Datalake central? pan comido! o eso pense yo. Esta es la historia de la dura realidad :)

Cuando empezaron a subir las facturas de precio, el ancho de banda y la gestion, se dieron cuenta de que eso era una castaña pilonga y que eso solo nos iba a llevar a tener problemas de toda indole en el futuro.

De ahi, migramos a lo que tenemos ahora entre manos, basicamente, en cada DC tenemos un cluster de Kafka y Zookeeper que funciona internamente.
A traves del KafkaMirror, lo replicado a un datalake centralizado donde estrujamos bien estrujao el cluster de Hadoop.

El problema viene, porque KafkaMirror segun se tiene entendido, funciona: superguay OOTB y no deberia de fallar o darte problemas……..:/
Bueno, a mi me ha dado toda clase de problemas, el ultimo me he llevado 3 dias de intenso esfuerzo y ir comprobando una por una las configuraciones, hasta dar con el maldito (una chorrada, pero lo das por supuesto y no es asi)

Como funciona

KafkaMirror es un simple consumer con un producer embedido, de manera, que se conectar como consumer a el Kafka que quieras y lo inyecta localmente con el producer en el kafka local.
Como es un consumer, te permite tener multithread usando las particiones de Kafka, por cada particion, puede decirle a un kafkaMirror que consuma ese thread, de manera que un solo kafkaMirror leyendo de 5 threads puede consumir 5 particiones a la vez.
Si a esto le sumamos que podemos correr varios mirrors en varios kafka, nos da la posibilidad de tener un throughput elevado (ya que leemos a la vez de varias particiones) y un failover (ya que tenemos varios kafkaMirror leyendo a la vez del mismo kafka de origen).

Toda la info de los offsets (cuales son los ultimos mensajes consumidos) asi como de lo integrantes del mirror, los gestiona a traves de los consumer groups.
En el oldConsumer, esta informacion se guarda en zookeeper creando un znode, en el newConsumer se guarda directamente en los brokers de kafka (y a este tema volveremos luego, porque esa fue la raiz de nuestro problema).
Cuando los miembros del kafkaMirror pertenecen al mismo consumerGroup es cuando la carga se distribuye entre todos los miembros de la misma, de manera que tenemos varios servers de kafka corriendo kafka mirror que se entienden entre ellos y se distribuyen que topics van a consumir del kafka de origen.
La verdad que el sistema funciona bien, muy bien…hasta que deja de hacerlo.

Nuestro problema

La topografia de nuestros Kafka es sencilla: por cada DC repartido por el mundo, tenemos un kafka+zookeeper que alimenta lo microservicios, los llamaremos PROx.
Luego tenemos el DataLake local, que usamos para pruebas, BI, desarrollo, etc. Lo llamamos DWdev-1 (si, nos hemos roto la cabeza con el nombre). Este se conecta como consumer a los PROx e inyecta lo mensajes al kafka de DWdev-1. Luego, tenemos el DWpro-1 que es el gordo, aqui se corre toda la BI, Reports, etc de todos los entornos y son un rack de jierros.
Cuando la info esta en el Kafka de destino, en DWdev-1 y DWpro-1, a traves del KafkaConnect, parseamos y ponemos guapa toda la info para meterla en Hive y poder usarla. Este kafkaconnect lee mensajes como un consumer mas, los parsea y los inyecta en HDFS en formato ORC para que Hive los lea con el mejor performance (esto ultimo despues de venir de la Data Works summit de Berlin, es discutible.)

Bien, el problema viene porque el KafkaConnect SOLO acepta el newConsumer, que es, el consumidor de bajo nivel que se conecta a los brokers de kafka, lee la info de los topics, particiones y leader de las particiones del __consumeroffsets en forma de metadata.
El oldConsumer hace lo mismo, pero lo lee de zooKeeper y este le da los metadatos de los broker.
Bien, a nosotros solos nos funcionaba correctamente cuando usabamos el OldCOnsumer, como un tiro. Con el newConsumer simplemente se quedaba muerto.
En los logs no indicaba nada, en el tcpdump tampoco….muerto.

Cual era al problema?

Maldito __consumeroffsets

Basicamente, toda la info de los topics, mensajes y demas, kafka la guarda en una carpeta, en nuestro caso esta en un raid sin LVM llamado /kafka-data.
Todos los topics que tiene en memoria antes de que expiren, ahi estan.
Cuando un broker viene a la vida y a este mundo, se registra si o si con zookeeper, le dice que ID de broker tiene, su hostname, su puerto, una especie de GUID y le da un besito en la frente. Esta info tambien se guarda en el __consumeroffsets y se guarda en la carpeta /kafka-data.
Si por algun casual el id del broker cambia, este es modificado en el zNode del zooKeeper…pero no en el __consumeroffsets :( con lo que cual, el oldConsumer funciona pero el newConsumer se queda esperando que los brokers con los ID asignados a las particiones manden los datos. Da igual que el hostname sea el mismo, no funciona. Simplemente, no sabe que hacer. Ni falla, ni da timeout, ni excepcion, nada.

Como descubri este problema? al agregar al newConsumer (siguiendo este hilo de stackoverflow) --partition 0 fuerzas que te traiga todo lo que contenga esa particion. De manera que no esperas a tener los metadatos, sino que se los das tu, ademas, si intentabas traerte los mensajes del __consumeroffsets te decia que no habia leader para ese topic (waaat?!?!??!)
Asi que te metias en el zookeeper y mirabas quien era el leader de las particiones de |__consumeroffsets te decia que eran los servers con ID:

  • 1001
  • 1002
  • 1002

y sin embargo en el zNode, en brokers te decia que los servers tenian el ID:

  • 1010
  • 1011
  • 1012

Total, que el newConsumer esperaba obtener los particiones de los servers 1001,1002 y 1003 pero el oldConsumer sabia que los leader de los topics eran los servers 1010, 1011 y 1012

La solucion? borrar fisicamente de /kafka-data el /__consumeroffsets y reiniciar los kafka, mano de santo.

Como montar la replicacion y no morir en el intento

Despues de este pequeno detour y de explicar los motivos de montar la replicacion, vamos a empezar con como los he montado, un punto imporante es tener el kafkaMirror como un servicio, pero no es mas que un sh al que le pasas parametros, con lo cual entra en juego daemonize que basicamente, daemoniza todo.
Luego, kafkaMirror necesita de una configuracion de un consumer para leer del kafka de origen y una de consumer, para escribir en el destino. Lo que he hecho es llamar a un wrapper que corre el daemonize que a su vez ejecuta el sh de consumer / producer con los parametros.

  • instalamos daemonize
    yum install daemonize -y

  • creamos las carpetas para el daemon (PID, log y lock) y le ponemos los permisos que creamos oportunos
    mkdir -p /opt/kafkaMirror/daemon; chown -Rvf kafka.kafka /opt/kafkaMirror; chmod -Rvf 700 /opt/kafkaMirror

  • transferimos la configuracion del consumer (Esta es la configuracion que yo he encontrado que es buena como punta de partida)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    # consumer (from remote kafka >>> DWdev-1)
    #################################
    #OLD method

    zookeeper.connect=ZOOKEEPER:PUERTO
    zookeeper.session.timeout.ms=10000
    zookeeper.sync.time.ms=5000
    rebalance.max.retries=10
    auto.offset.reset=largest
    consumer.timeout.ms=5000
    exclude.external.topics=true
    fetch.message.max.bytes=10000000
    rebalance.backoff.ms=10000
    socket.receive.buffer.bytes=33554432
    partition.assignment.strategy=roundrobin

    #To commit on zookeeper and kafkabrokers
    dual.commit.enabled=true
    ############

    #NEW method
    #bootstrap.servers=KAFKABROKER:PUERTO
    #
    #Retrieve the last commited offset (instead the earliest one)
    #auto.offset.reset=latest
    #
    #heartbeat.interval.ms=6000
    #session.timeout.ms=10000
    #
    #auto commit to zookeeper
    #enable.auto.commit=true
    #exclude.internal.topics=true
    #request.timeout.ms=20000
    #
    #Leader metadata refresh
    #metadata.max.age.ms=120000
    #reconnect.backoff.max.ms=5000
    #retry.backoff.ms=1000

    ############

    #STANDAR config
    client.id=HOSTNAME
    group.id=DWdev1
  • transferimos la configuracion del produccer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    #KafkaMirror Producer
    bootstrap.servers=KAFKABROKERS:PUERTO
    connect.timeout.ms=20000
    (Disabled due to the kafka version: should be enabled on 10.2 (we are on 10.1))
    #producer.type=async
    #key.class=kafka.serializer.DefaultEncoder
    (https://engineering.salesforce.com/mirrormaker-performance-tuning-63afaed12c21)
    compression.type=gzip
    linger.ms=15000
    batch.size=50000
    buffer.memory=2000000000
    max.request.size=1000000
    #avoid data lost
    #https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_kafka-component-guide/content/avoiding-data-loss.html
    block.on.buffer.full=true
    acks=-1
    max.in.flight.requests.per.connection=1
    retries=5
  • transferimos el wrapper, uno por servicio si teneis como yo, que traer datos de varios entorno a uno central
    wrapper_.sh

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #!/bin/bash
    #Nicolas Tobias twitter: @nicolastobias_ ## Marzo-2017
    #Simple Wrapper
    ################################################################################
    daemonize -c /opt/kafkaMirror/daemon /
    -e /opt/kafkaMirror/daemon/<KM-SERVICIO>.error.log /
    -o /opt/kafkaMirror/daemon/<KM-SERVICIO>.ouput.log /
    -p /opt/kafkaMirror/daemon/<KM-SERVICIO>.pid /
    -l /opt/kafkaMirror/daemon/<KM-SERVICIO>.lock /usr/hdp/current/kafka-broker/bin/kafka-mirror-maker.sh /
    --consumer.config /opt/kafkaMirror/<ENV>-consumer.cfg /
    --producer.config /opt/kafkaMirror/<ENV>-producer.cfg /
    --whitelist="topic1,topic2,..."
  • ajustamos los permisos al usuario kafka
    chown -Rvf kafka.kafka /opt/kafkaMirror; chmod -Rvf 700 /opt/kafkaMirror

  • habilitamos el servicio en systemd un fichero como este por servicio en
    /usr/lib/systemd/system/<SERVICIO>.service

    1
    2
    3
    4
    5
    6
    7
    8
    9
    [Unit]
    Description= <DESCRIPCION>

    [Service]
    Type=forking
    ExecStart=/opt/kafkaMirror/daemon/<WRAPPER SCRIPT>

    [Install]
    WantedBy=multi-user.target
  • ponemos los permisos correctos de 0755 al wrapper

  • recargamos el daemon de systemd
    systemctl daemon-reload
  • habilitamos y arrancamos
    systemctl enable <SERVICIO>; systemctl start <SERVICIO>

Si todo ha ido bien, aunque no sea un error, el daemonize se toma la salida del kafkamirror de consola como un error y lo mando al fichero .error.log, asi que mirar ahi a ver si os da algun mensaje.

listo!
Con esto, deberiais de tener el KafkaMirror arrancado en un servicio que usando systemctl status os dara el estado del servicio.

<< CORP >>> root@hdp-dw-1-nn-1:/home/nicolast# systemctl status kafkaMirror-pro2
kafkaMirror-pro2.service - kafkaMirror-pro2 > DWdev-1
   Loaded: loaded (/usr/lib/systemd/system/kafkaMirror-pro2.service; enabled; vendor preset: disabled)
   Active: active (running) since Fri 2018-04-13 16:05:53 CEST; 1 weeks 2 days ago
  Process: 40948 ExecStart=/opt/kafkaMirror/daemon/kafkaMirror-pro2-wrapper.sh (code=exited, status=0/SUCCESS)
 Main PID: 40950 (java)
   CGroup: /system.slice/kafkaMirror-pro2.service
           └─40950 /usr/jdk64/jdk1.8.0_112/bin/java -Xmx256M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:Initiat...

Apr 13 16:05:53 hdp-dw-1-nn-1.domain.tld systemd[1]: Starting kafkaMirror-pro2 > DWdev-1...
Apr 13 16:05:53 hdp-dw-1-nn-1.domain.tld systemd[1]: Started kafkaMirror-pro2 > DWdev-1.

Playbook de ansible

Podria ponerlo en github pero por el momento no he tenido tiempo, asi que si quereis, asi teneis lo teneis zipeado.
Echarlo un ojo al vars para configurar la variables a gusto :)

KafkaMirror Playbook

Comentarios

⬆︎TOP