Y si os dijera que podeis enviar todas las transacciones de MariaDB a Kafka sin tener que hacer practicamente nada? si, de verdad. Esto es Maxwell

Que es Maxwell

Maxwell es un demonio escrito en Java que te permite pasar todos las transacciones de MariaDB / MySQL a Kafka, de forma nativa y segun ellos mismo, con un buen rendimiento.
Yo lo he probado y de repente, se ha puesto como el streamer de-facto en todos nuestros entornos. Se acabo la replicacion de Binlog de MariaDB, se acabo Debezium, se acabaron un monton de cosas…La vida sonrie y todo es mas facil, joer.

Como funciona?

SE-RIA-LI-ZA-CION, ya esta.
Lee todas las transacciones del binlog y los convierte a mensajes de Kafka y los mete en los topics que te de la gana. Es simple y maravilloso. Podria ponerme en modo politico a decir mil cosas vacias, pero es que no tiene mas.
Todos los progresos los guarda en la base de datos que aparece como maxwell, tanto el servidor como el binlog como la posicion del mismo. Esto lo hace muy facil para resetearlo: te cargas la bd y listo.

Bootstraping

Permite el bootstrapping, esto es, meter una tabla entera con un select * from database.table, con lo que puedes importar las tablas a toda pastilla y luego seguir sincronizando con el.
Tiene su miga, ya que hay que meter un registro en la tabla maxwell.bootstrap teniendo mucho cuidado de que base de datos, tabla y sobretodo client_id metes.
Te quedaria algo como asi:

1
2
3
4
5
6
7
8
9
10
11
12
13

maxwell> insert into maxwell.bootstrap (database_name, table_name, client_id) values ('database1', 'table1', 'hadoop-2.dev1.domain.local')
maxwell> select * from maxwell.bootstrap
+----+----------------+-------------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+----------------------+-----------------+------------------------------+
| id | database_name | table_name | where_clause | is_complete | inserted_rows | total_rows | created_at | started_at | completed_at | binlog_file | binlog_position | client_id |
+----+----------------+-------------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+----------------------+-----------------+------------------------------+
| 3 | database1 | table1 | <null> | 1 | 36992 | 0 | <null> | <null> | 2018-06-07 11:59:13 | master-binlog.000033 | 303163669 | hadoop-2.dev1.domain.local |
| 7 | database1 | table1 | <null> | 1 | 38840 | 0 | <null> | 2018-06-11 12:39:26 | 2018-06-11 12:39:27 | master-binlog.000035 | 491373491 | hadoop-2.dev1.domain.local |
| 11 | database1 | table1 | <null> | 1 | 36772 | 0 | <null> | 2018-06-13 11:11:57 | 2018-06-13 11:11:59 | master-binlog.000036 | 747036925 | hadoop-2.dev1.domain.local |
| 13 | database1 | table1 | <null> | 1 | 6002 | 0 | <null> | 2018-06-13 11:24:07 | 2018-06-13 11:24:07 | master-binlog.000036 | 748146191 | hadoop-2.dev1.domain.local |
| 15 | database1 | table1 | <null> | 1 | 78 | 0 | <null> | 2018-06-13 11:32:06 | 2018-06-13 11:32:06 | master-binlog.000036 | 748279359 | hadoop-2.dev1.domain.local |
| 17 | database1 | table1 | <null> | 1 | 1955 | 0 | <null> | 2018-06-13 11:32:06 | 2018-06-13 11:32:07 | master-binlog.000036 | 748540354 | hadoop-2.dev1.domain.local |
+----+----------------+-------------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+----------------------+-----------------+------------------------------+

Ahora, solo a esperar que se conecte y empieze el bootstrap.

Caracteristicas

Ventajas

  • Simple, mas que el mecanismo de un chupete
  • Se actualiza bastante a menudo
  • Lo Devs responden bastante rapido en el gitHub
  • Tiene muchas opciones de configuracion pero sin nada, funciona cuasi del tiron
  • Te permite hacer bootstraping de las tablas para hacer una carga inicial, despues, sincroniza
  • Te coneta con RabbitMQ, Kinesis, Kafka….casi todas. Una delicia
  • Como es JMX, tiene metricas y te las expone el solo en un endpoint

Desventajas

  • Hombre, no es enterprise-made y se nota.
  • La documentacion tiene fallos o cosas mal explicadas (como hacer el bootstraping)
  • Estaria guay que tuviera algun tipo mas de control en el momento de la conexion al binlog (se desconecta mucho)
  • El sistema de bootstraping, no es muy intuitivo ( o yo soy un zote )
  • NO SOPORTA OFICIALMENTE MARIADB. Si, como lo leeis. Pero lo haran y si no usais ejem….idempotencia (cof cof cof…) no pasa nada. Lo explico mas abajo.

Desplegandolo…

Este es el fichero de configuracion que estamos usando nosotros, debidamente limpiado:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
cat config.properties
######### general stuff #############
log_level=info

# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka

######### mysql stuff ###############
host=bbdd.dev1.domain.local
port=3306
user=maxwell
password=<<PASSWORD>>

# name of the mysql database where maxwell keeps its own state
schema_database=maxwell
client_id=kafka.dev1.domain.local

# maxwell can optionally replicate from a different server than where it stores
# schema and binlog position info. Specify that different server here:
#
#
#replication_host=other
#replication_user=username
#replication_password=password
#replication_port=3306

######### output format stuff ###############
#output_binlog_position=true
#output_gtid_position=true

# This controls whether maxwell will output JSON information containing
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
# See also: ddl_kafka_topic
######### kafka stuff ###############

# list of kafka brokers
kafka.bootstrap.servers=kafka.domain.local:6667
kafka_topic=stream_%{table}

# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed

kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
#kafka.batch.size=16384
include_dbs=database1,database2
include_tables=table1,tabla2,tabla3,tabla4
metrics_type=jmx,http

# The prefix maxwell will apply to all metrics
metrics_prefix=

# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
http_port=9311

# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
#http_path_prefix=/some/path/

# To enable Maxwell diagnostic
http_diagnostic=true # default false

# Diagnostic check timeout in milliseconds, required if diagnostic = true
http_diagnostic_timeout=10000 # default 10000

######### misc stuff ###############

# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic. Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions). sync mode is safer but you
# have to stop replication.
bootstrapper=async

Explicando algunas partes:

host & replication host Contempla la posibilidad de replicar del servidor 1 y guardar toda la info de donde esta replicando en el servidor 2.
client_id Podemos tener varios Maxwell haciendo pulling de diferentes tablas del mismo servidor para tener paralelismo, solo cambia el client_id
kafka_topic=stream_%{table} Esto me parece que es de lo mejor, permite que definas esa variable para realizar el stream de la tabla a su topic con o sin prefijo. Nosotros, filtramos luego las tablas y las BBDDs.
include_db & include_tables estos filtros funcionan en conjunto, esto esto, que si declaro tablas pero no bases de datos, buscara en todas las bases de datos. Al poner base de datos, buscara en esas bases de datos, las tablas.
metrics_type Otra pasada y ademas, bastante completas (si las pones juntos con el Maxwell Exporter, la delicia del Grafanologo! )
http_diagnostics Otra cosa guay lista, un healcheck para controlar con Zabbix, Nagios o Icinga.
bootstraper sync o async. Async hace el bootstraping levantando otro hilo y sin bloquear el sincronizador principal. Pero, tarda mas.

Conclusion

Soluciones ahi afuera hay varias, pero a mi me gusta mucho esta. No es la mejor seguro, pero tiene un punto de equilibrio que nos parece bueno porque:

  • es tonta y muy facil de administrar, si se te muere, limpias la reinicias y listo
  • para ser free, tiene un monton de conectores
  • El perfomance es mas que bueno, segun dicho por los Dev, una vm de 4 cores y poquita RAM, 5K/s sin despeinarse
  • Se deja desplegar muy biena traves de ansible. Incluso lo tengo montado como servicio que monitoriza el PID y cuando se cae la conexion a la BD, lo levanta el solo. Y encima trunca el log….perfecto!
  • Los logs son muy descriptivos, entendibles.
  • Los DEV estan ahi y la mejoran. No me entrañaria nada que sacaran una version comercial. Yo la compraria

No soporta MariaDB….pero da casi lo mismo

Paso que un dia probandolo, se fue al carajo por que segun decia en los logs no entendia la siguiente transaccion DROP DATABASE IF EXIST DATABASE1 Efectivamente, no soporte idempotencia de MariaDB. Teneis que arrancar el demonio a mano en el siguiente evento, sorry. Asi que ya sabeis, nada de hacer las cosas bien: a partir de ahora todos los scripts de sql sin idempotencia }:)

En nuestro modelo de datos, encaja a la perfeccion. Estuvimos probando el Tungsten el cual casi fue el elegido, pero demasiado opensource y artesanal el tema de materializar a Hive. Esto no materializa a Hive, peeeeeero para eso tenemos el kafka-connect y en ejercito de javadevs que lo saben dejar bonito.
Ademas, permite que solo entre entornos tengamos que replicar los kafka usando el KafkaMirror. Nada mas. Menos overhead de administracion y otro tanto de ahorro en el ancho de banda.

Ale, he dicho!


Maxwell-Daemon

Comentarios

⬆︎TOP