Já escrevi sobre as dores de se trabalhar com engenharia de dados e muitas delas aparecem quando se utiliza Kafka. Não é uma crítica ao software – tem ótimas sacadas de design para escalabilidade como o princípio de zero cópias, usar diretamente o filesystem, pull ao invés de push e load balancing – mas tudo isso traz uma complexidade intrínseca, que cobra um preço dos usuários.

Ler dados de um tópico Kafka costuma ser bem rápido, rápido demais inclusive. Ao consumir um tópico, o problema costuma variar entre dois extremos: baixo throughput, porque a aplicação quer manter um consumo exactly-once sem concorrência/paralelismo, ou overflow do consumidor porque ela está lendo mais rápido do que consegue processar.

Quando o assunto Kafka é abordado, discute-se muito as questões de infra do ambiente (e.g. número de partições, réplicas e picos de uso). O que pretendo abordar nesse post, é o lado de desenvolvimento, como construir e escalar uma aplicação que consome os dados a partir de um tópico.

Primeiro, farei uma breve introdução a eventos e conceitos básicos importantes para se trabalhar com Kafka. Depois, propor um problema hipotético e entender as abordagens possíveis. Por fim, implementar as possíveis soluções para tangibilizar a discussão em números e código.

Por que eventos?

A ideia de eventos se popularizou bastante nos últimos anos e um cenário em que ela funciona muito bem, é o problema de mensageria: quando algo acontece no sistema, uma compra confirmada em um sistema de e-commerce por exemplo, é necessário notificar o usuário via e-mail.

A solução imediata é fazer uma integração síncrona via API, confirmando a compra após finalizar todas etapas, incluindo o envio do e-mail.

Sistema Vendas{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 1} ...] address:}MensageriaComunicação síncrona

Mesmo em uma escala pequena, esse tipo de solução normalmente incorre em uma série de problemas:

  • enviar e-mails é um processo demorado e deixar uma requisição “presa” enquanto espera o envio, pode sobrecarregar tanto os sistema de vendas quanto a mensageria;

  • é comum usar serviços externos para envio de e-mails, o que aumenta problemas de latência e riscos de indisponibilidade;

  • nem sempre as comunicações precisam (ou devem) ser enviadas logo após a compra, comunicações e canais podem ter políticas de envio complexas;

  • o e-mail normalmente não é parte crítica do processo: faz mais sentido vender e não enviar o e-mail, do que perder uma venda por não conseguir enviar o e-mail.

Dado esse cenário, é normal partir para uma solução orientada a eventos, adicionando um message broker para mediar a comunicação entre o transacional e a mensageria.

Sistema Vendas{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 1} ...] address:}MensageriaComunicação assíncronaMessage Brocker{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 3} ...] address:}{message_id: "fsd3-..." client_id:: "352j-...", products: [ {sku: "231", quantity: 1} ...] address:}

Nessa solução, o sistema de vendas coloca na fila os e-mails que precisam ser enviados, mas não aguarda o envio pela mensageria. Ao desacoplar os processos e adotar uma arquitetura orientada a eventos, temos uma série de vantagens:

  • postar mensagens em um broker, costumar ser uma operação simples e estável. Isso torna o sistema de vendas mais rápido, mais fácil de dimensionar e escalar;

  • o envio de e-mails – um processo complexo e demorado – não precisa mais ser dimensionado para o pico, apenas o broker precisa suportar o pior caso;

  • apesar do broker ser uma peça a mais para manter, na prática costuma ser mais simples garantir a disponibilidade de um broker que de um sistema como a mensageria.

Por outro lado, temos desvantagens, especialmente pensando nas garantias de sucesso:

  • o sistema de vendas não sabe quando o e-mail foi enviado, nem mesmo se houve problemas no envio;

  • a observabilidade é mais difícil por envolver mais peças, especialmente erros que envolvem perda de mensagens;

  • programação assíncrona é mais complexa que síncrona, esse tipo de solução costuma ser um ônus para os desenvolvedores.

A questão é ponderar esses trade-offs, o exemplo da mensageria é um cenário em que faz bastante sentido usar eventos, mas em outros casos pode ser necessário uma análise mais criteriosa antes de adotar uma abordagem assíncrona.

O cenário proposto é um exemplo de comunicação ponta a ponta usando eventos, mas é comum existir a necessidade de mútiplos consumidores. Por exemplo, além de enviar os eventos para o sistema de mensageria, salvar o que foi enviado em um ambiente analítico. Para lidar com esse cenário, uma opção interessante é usar o conceito de tópicos ao invés de filas.

Fila ou PubSub?

O padrão para comunicação ponta-a-ponta, é utilizar uma fila: o broker mantém as mensagens guardadas até que o consumidor dê um “ack”, indicando que uma mensagem específica pode ser removida.

Sistema Vendas{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 1} ...] address:}MensageriaFilaMessage Brocker{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 3} ...] address:}{message_id: "fsd3-..." client_id:: "352j-...", products: [ {sku: "231", quantity: 1} ...] address:}

O problema é que esse padrão não suporta múltiplos consumidores, é necessário replicar e alimentar a fila para cada consumidor adicional. Não é uma solução escalável, já que a quantidade de mensagens cresce em função do número de consumidores.

A alternativa, para lidar com um cenário de múltiplos consumidores, é utilizar a abordagem PubSub adotada pelo Kafka. Nesse modelo, o broker guarda o conteúdo por um tempo determinado (e.g. 7 dias), fica a cargo da aplicação consumidora controlar o offset para saber o que foi lido.

PubSubSistema Vendas{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 1} ...] address:}MensageriaMessage Brocker{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 3} ...] address:}{message_id: "fsd3-..." client_id:: "352j-...", products: [ {sku: "231", quantity: 1} ...] address:}Analyticsoffset:mensageria:2{message_id: "fsd3-..." client_id:: "a324-...", products: [ {sku: "231", quantity: 1} ...] address:}offset:analytics:4

No cenário acima, temos dois consumidores, cada um com um offset diferente. Como mandar e-mails é um processo mais demorado que salvar no ambiente analítico, então é normal o offset da mensageria ficar “atrasado” em relação ao processo analítico.

Essa abordagem de PubSub é mais flexível, pois facilita o cenário de múltiplos consumidores, que é uma demanda bastante comum. Por que utilizar filas então, se existe essa limitação de um único consumidor? A simplicidade é o principal motivo, já que o paradgima PubSub tem vários detalhes que precisam ser tratados.

Ao trabalhar com offsets, existem diferentes formas de lidar com a atualização a depender das semânticas de consumo: at-most-once, exactly-once e at-least-one. Usando filas, basta dar um “ack” para garantir um consumo exactly-once.

Arquitetos e programadores

Em geral, os trade-offs de síncrono e assíncrono se repetem, independente do problema de negócio e do tipo de broker utilizado. Da perspectiva de arquitetura, quase sempre é melhor ser assíncrono e no modelo PubSub, mas programadores ficam ressabiados com os desafios de implementação.

Arquitetos se preocupam muito com escalabilidade e custos, pontos fortes da comunicação assíncrona. Para os desenvolvedores, por outro lado, é necessário mais esforço para sustentação e os desafios de desenvolvimento são maiores.

O mundo seria mais simples para programadores, se os sistemas fossem sempre integrados usandos APIs REST síncronas, mas a realidade é que precisamos lidar com tópicos e o paradigma PubSub.

Semânticas de consumo

A mensageria é um problema que se encaixa bem com a ideia de eventos, mas é necessário cuidado ao usar tópicos no lugar de filas. A forma de lidar com o consumo depende da natureza da comunicação, uma decisão que precisa ser tomada com o negócio em vista.

Podemos consumir um tópico seguindo uma dessas semânticas:

  • at-most-once significa que a mensagem será consumida no máximo uma vez, mas pode ser perdida;
  • at-least-once significa que todas as mensagens serão consumidas uma vez, mas podem ser consumidas mais de uma vez;
  • exactly-once significa que todas as mensagens serão consumidas uma única vez.

O ideal seria tudo ser exactly-once, mas não é algo que vem de graça. Para garantir essa semântica, é necessário um controle externo de offsets e há limites para a escalabilidade do consumidor, enquanto as outras abordagens são mais simples e escaláveis.

Considerando o cenário de e-mails para confirmação de compra, é interessante pensar na abordagem exactly-once, mesmo com as dificuldades:

  • mensagens enviadas por e-mails, normalmente não demandam urgência e são pouco volumosas, o que reduz os problema de escalabilidade;

  • por outro lado, enviar e-mails repetidos é algo ruim para experiência e pode ser classificado como spam pelos serviços;

  • no cenário de um fluxo de compras, perder mensagens com comprovantes pode ser problemático para o consumidor e o negócio.

Essa discussão seria muito diferente, se pensarmos em mensageria de notificações para uma rede social por exemplo, que normalmente é um cenário de grande volume e menor criticidade. As vantagens de escalabilidade de uma abordagem at-most-once ou at-least-once podem ser indispensáveis, mesmo que exista o risco de perder ou repetir notificações.

Garantindo exactly-once

Começando pelo cenário exactly-once, podemos nos basear na solução do tutorial da Confluent, que é a implementação mais simples possível de um consumidor.

try:

    while True:

        msg = consumer.poll(timeout=1.0)
        if msg is None: continue

        process_message(conn, msg.value().decode())

finally:
    consumer.close()

Nessa solução, cada mensagem consumida é processada logo em seguida, mas não há controle explícito dos offsets. Como o objetivo é trabalhar com exactly-once, o ideal é desativar o commitautomático. O consumidor deve controlar a atualização manualmente e de forma síncrona programaticamente.

try:

    count = 0    
    start_time = datetime.now()

    while True:

        msg = consumer.poll(timeout=1.0)
        if msg is None: continue

        process_message(conn, msg.value().decode())

        consumer.commit() # Commit explícito para cada mensagem

finally:
    consumer.close()

Nesse código, aparentemente é garantido o consumo exactly-once, mas e se houver um erro ao executar process_message? Nesse cenário, se faz necessário controle externo – fila morta, banco de dados, cache – para guardar as mensagens que deram erro.

try:

    count = 0    
    start_time = datetime.now()

    while True:

        msg = consumer.poll(timeout=1.0)
        if msg is None: continue

        ok = process_message(conn, msg.value().decode())

        if not ok:
          dlq(msg) # Salvando mensagens em uma dlq (dead letter queue)

        consumer.commit()

        if count == num_records: break

finally:
    consumer.close()

Sem um controle externo para salvar as mensagens com erro, é necessário parar o processamento do tópico: não é possível atualizar o offset para a mensagem \(x_{i}\), se existe uma mensagem \(x_{j}\) \(\forall j < i\) não processada.

Para esse cenário exactly-once, as filas costumam funcionar melhor, já que as mensagens são controladas individualmente e não por offsets. Soluções de fila, como SQS da Amazon e RabbitMQ por exemplo, têm o recurso de fila morta para facilitar a gestão de mensagens problemáticas.

No caso do tópico Kafka, o usuário precisa estar ciente desse problema e implementar uma solução por fora. Ou seja, é possível implementar a semântica exactly-once em tópicos, mas é necessário cuidado com offsets e talvez demande ferramentas extras para o tratamento adequado de erros.

Apesar de não ser a melhor estratatégia para consumir um tópico, recomendo começar pela semântica exactly-once até que necessidades de escalabilidade apareçam. As outras semânticas podem trazer ganhos expressivos, mas criam outras dificuldades e perde-se garantias.

Um equívoco comum, quando surge a necessidade de escalar um consumidor Kafka, é misturar a leitura do tópico com o processamento da mensagem. É importante entender essa diferença, porque a solução dos problemas vão em direção oposta: aumentar o throughput de leitura, quando o gargalo é de processamento, poder causar overflow no consumidor.

A ideia é discutir os gargalos de processamento das mensagens, não do consumo propriamente dito, pois vejo poucas discussões sobre o assunto e muitas dificuldades por parte dos desenvolvedores. É uma questão que depende muito do problema a ser resolvido – aplicar um modelo de machine learning, salvar as mensagens em um banco de dados e agregar informações são tarefas muito diferentes – mas existem estratégias que podem ser aplicadas otogonalmente à natureza do problema.

Um problema “I/O bound”

Voltando ao nosso cenário de mensageria, imagine que essas mensagens precisem ser salvas em um banco de dados, para uso analítico e acompanhamento da operação. A solução é consumir continuamente as mensagens postadas no tópico e salvá-las em uma tabela do PostgreSQL.

Para ilustrar o problema, a estratégia foi produzir mensagens aleatórias serializadas como json em um tópico. Abaixo, um exemplo de mensagem gerada.

{
    "message_id": "2268086d-5d3f-40e2-80ce-fc9f4e3008dc",
    "client_id": "8592b337-168e-4304-82d3-0d36b556e58b",
    "products": [
        {
            "sku": "d33b284b-c62e-47d7-b8e8-2048fe2d9da5",
            "price": 100,
            "quantity": 2
        },
        {
            "sku": "aebbb957-e16e-43d2-b742-198c12ef96a8",
            "price": 16387,
            "quantity": 4
        },
        {
            "sku": "9f8c0bfe-5e9b-4822-84b7-7b8e86a6469d",
            "price": 6750,
            "quantity": 2
        },
        {
            "sku": "79c5824b-ac16-4b59-8bda-31328a0204ec",
            "price": 100,
            "quantity": 5
        }
    ]
}

Para geração dessas mensagens, o script abaixo foi utilizado.

def create_sample():
    
    σ_price = 2_000
    μ_price = 10_000

    quantity_range = (1,10)
    products_range = (1,5)

    return {
        'message_id': str(uuid4()),
        'client_id': str(uuid4()),
        'products': [{"sku": str(uuid4()),
                      "price": max(100,
                                   int(σ_price + np.random.randn() * μ_price)),
                      "quantity": np.random.randint(*quantity_range)}
                     for _ in range(np.random.randint(*products_range))]
    } 

As mensagens serão salvas em uma tabela, com dois campos (message_id e client_id) que armazenam dados extraídos da mensagem e uma coluna de content para guardar o conteúdo original do tópico.

create table messages(message_id char(36) primary key, 
                      client_id char(36),
                      datetime timestamp,
                      content text)

Para salvar a mensagem, será utilizada essa função, que faz um parse do json e insere na tabela que foi descrita.

def save_message(conn, msg: str) -> callable:

    content =  json.loads(msg)

    values = (
        content['message_id'],
        content['client_id'],
        datetime.now(),
        msg,
    )

    insert = "insert into messages VALUES (%s, %s, %s, %s)"

    with conn.cursor() as cur:
        cur.execute(insert, values)

    conn.commit()

O problema proposto é I/O bound e facilmente paralelizável, um cenário comum em sistemas de informação e que podemos ter ganhos expressivos de performance com um bom desenho de solução.

Os limites do exactly-once

A primeira opção para resolver esse problema, é utilizar a solução do consumidor exactly-once, substituindo a função process_message pela save_message.

try:
    count = 0

    while count < num_records:
        
        msg = consumer.poll(timeout=1.0)
        if msg is None: continue

        save_message(conn, msg.value().decode())

        count+=1
        consumer.commit()
finally:
    conn.close()

Para inserir 1.000.000 de registros, esse script demorou 47 minutos. O gargalo desse processo é escrever no banco de dados. Removendo a etapa de inserção – mas mantendo o pull das mensagens, parse e formatação para o modelo de dados – o processo demorou 16 segundos.

Fiz esse experimento, para ilustar que esse é um cenário de problemas com o processamento da mensagem. Otimizações no consumo e/ou brokers não fazem sentido, já que essa parte está ocupando uma pequena fração do tempo total de processamento.

Asyncio para remover o gargalo

A lentidão dessa solução é devido a ausência de concorrência, executar esse tipo de trabalho de forma sequencial é um desperdício em dois sentidos:

  • o processo do consumidor fica pausado, enquanto espera salvar no banco de dados;

  • os bancos de dados normalmente são desenhados para lidar bem com escritas concorrentes, escrever vários registros sequencialmente não tira proveito disso.

Existem várias alternativas para implementar concorrência em Python, uma delas é o uso de asyncio para trabalhar com tarefas em segundo plano. Essa estratégia depende do suporte das bibliotecas utilizadas, então nem sempre é possível utilizá-la.

A biblioteca psycopg, utilizada para conexão com o banco de dados, tem suporte a asyncio. A função save_message foi re-escrita de forma assíncrona como asave_message.

async def asave_message(aconn, msg: str):
    
    async with aconn.cursor() as acur:

        content =  json.loads(msg)

        values = (
            content['message_id'],
            content['client_id'],
            datetime.now(),
            msg,
        )

        insert = "insert into messages VALUES (%s, %s, %s, %s)"

        await acur.execute(insert, values)

A programação assíncrona com asyncio tem vários conceitos como tarefas, corotinas, etc – eu mesmo tenho uma compreensão superficial – então recomendo a documentação e outros materiais, caso o leitor queira entender melhor a implementação. Não é necessário compreender a implementação para a leitura do post, mas é interessante entender a ideia do que está acontecendo após essa mudança.

Na versão assíncrona, o processo não espera o término da inserção pra iniciar as demais. Aproveita-se melhor o tempo de CPU e o tempo total de execução é bem reduzido, considerando que o banco de dados lida bem com escritas concorrentes.

SíncronoTempoCPUIOTempoAssíncronoCPUIOCPUIOIOIOIOCPUCPU

Usando execução assíncrona, é preciso tomar cuidado com o overflow de mensagens. Se iniciarmos uma task para cada mensagem que chega, o consumidor pode ter problemas ao lidar com rajadas e cenários de dados represados e/ou reprocessamento.

A estratégia de micro-batches é um jeito fácil de lidar com esse problema, que consiste em gerar pequenos lotes a partir do fluxo contínuo. No código abaixo, a função consume puxa até BACTH_SIZE registros do tópico a cada iteração. A variável timeout é utilizada para configurar a latência máxima: se não houver BATCH_SIZE registros a serem puxados em 1 segundo, um batch menor será criado e executado.

try:

    count = 0

    while count < num_records:
    
        msgs = consumer.consume(BATCH_SIZE, timeout=1.0)
        if len(msgs) == 0: continue
        
        msgs_content = map(lambda x : x.value().decode(), msgs)

        await asave_messages(msgs_content)

        count+=len(msgs)
        consumer.commit()
finally:
    consumer.close()

A função asave_messages é responsável por lançar as tasks de asave_message, o await indica que o loop principal precisa esperar todas as mensagens serem processadas antes de buscar outro lote de mensagens.

async def asave_messages(msgs: list[str]):
    
    aconn = await psycopg.AsyncConnection.connect(db)

    async with aconn:
        async with asyncio.TaskGroup() as tg:
            for msg in msgs:
                tg.create_task(asave_message(aconn, msg))

        await aconn.commit()

O micro-batch é uma estratégia para ter previsibilidade no consumidor, que pode ser configurado e dimensionado de acordo com uma taxa fixa e latência esperada, apenas o broker que precisar lidar com picos de volumetria e eventuais anomalias. Perde-se um pouco em latência, mas é o trade-off esperado em uma arquitetura de eventos.

Nesse cenário, não existe mais uma garantia de exactly-once. Se houver algum erro durante o processamento do batch, o usuário deve optar por não atualizar os offsets e adotar uma semântica at-least-once ou atualizá-los e adotar uma semântica at-most-once. No cenário proposto, faz sentido adotar at-least-once, pois o banco de dados garante idempotência da operação de insert.

Usando BATCH_SIZE=10_000, o tempo total foi de 47 minutos para cerca de 3 minutos

A alternativa multiprocess

O asyncio é a solução desenhada para esse cenário de muito I/O concorrente, mas existem bons motivos para usar outros métodos de implementar concorrência, como multiprocess e threading.

O principal motivo é que a implementação assíncrona depende que as bibliotecas utilizadas a suportem, o que não é tão prevalente no ecossistema Python. Além disso, existem muitos conceitos complexos em programação assíncrona e isso reflete na dificuldade de implementação.

Processamento paralelo em Python é uma questão conteciosa pela existência do GIL, a decisão entre threads e processos normalmente é definida pela natureza do processamento. Sendo um processo I/O bound, eu poderia optar por threads que são mais leves e fáceis de se comunicar, mas optei por paralelizar com processos pelos seguintes motivos:

  • é um cenário clássico de data paralelism, as dificuldades de comunicação entre processos não são releventes nesse contexto;

  • usando pool de processos, o problema do custo extra de criar múltiplos processos é mitigado;

  • prefiro evitar pensar sobre questões de thread safety, vou usar um hack para criar objetos separados por processo e ignorar esse problema;

  • a solução pode ser aplicada em cenários CPU bound, se quiséssemos aplicar modelos de machine learning por exemplo.

A estratégia de usar vários processos, é aproveitar a interrupção do sistema operacional para operar de forma concorrente: quando um processo chega na etapa de I/O, ele é interrompido e outro entra em execução. No caso do asyncio, estamos usando um único processo Python que implementa concorrência usando Tasks.

A maior complicação de usar multiprocess nesse problema, é lidar com a conexão com o banco de dados. Não é possível compartilhar esse tipo de objeto entre processos, mas é contraproducente ficar recriando a conexão a cada iteração.

A solução é criar um objeto por processo, que será utilizado até o fim do batch. Não é a forma mais elegante, mas um jeito é criar uma variável vazia conn e inicializar como global usando a função set_global_conn.

conn = None

def set_global_conn():
    global conn
    conn = psycopg.connect(db)

Abaixo, a implementação do proceso usando set_global_conn na criação do pool de processos.

def save_messages_conn(msgs: list[str]): save_message_batch(conn, msgs)

try:

    count = 0

    while count < num_records:

        msgs = consumer.consume(BATCH_SIZE, timeout=1.0)
    
        if len(msgs) == 0: continue
        
        msgs_content = map(lambda x : x.value().decode(), msgs)

        with Pool(processes=48,
                  initializer=set_global_conn) as pool:

            pool.map(save_message_conn, msgs_content)

        count+=len(msgs)
        consumer.commit()

finally:
    consumer.close()

Em termos de desempenho, essa solução ficou parecida com a implementada utilizando asyncio. Ambos ficaram perto dos 2 minutos, mas o desempenho do multiprocess demanda mais processamento e está condicionada a configuração de núcleos da máquina.

As estratégias aplicadas até o momento, foram para agilizar as operações de I/O, mas podemos reduzir a quantidade de operações também.

Reduzindo o custo fixo

As operações de I/O têm um custo fixo – não importa a quantidade de dados transmitido, sempre é necessário interromper o processamento e lidar com o overhead do protocolo – faz sentido agrupar as operações e dissolver esse custo.

Quando se faz aplicações em lote para trabalhar com banco de dados, é sempre recomendado aplicar estratégias que tirem proveito dessa ideia. Desde ações simples, como não executar o commit para toda linha modificada, seja ações mais agressivas como remover índices durante o processamento e recriá-los posteriormente.

Nesse caso, a ideia é simplesmente agrupar os inserts em pequenos grupos e não chamar commit a cada linha inserida. É bem simples fazer isso com psycopg, basta criar uma lista de valores e usar o comando executemany no cursor.

def save_message_batch(conn, msgs: list[str]):

    values = []

    for msg in msgs:
    
        content =  json.loads(msg)

        values.append((
            content['message_id'],
            content['client_id'],
            datetime.now(),
            msg,
        ))

    insert = "insert into messages VALUES (%s, %s, %s, %s)"

    with conn.cursor() as cur:
        cur.executemany(insert, values)
        
    conn.commit()

No consumidor, é necessário criar esses grupos de mensagens a serem inseridas. Como as mensagens são independentes entre si, os grupos podem ser criados de forma arbitrária aplicando cortes no vetor.

if num_msgs <= BATCH_INSERT:
    chunks = [msgs_content]
else:
    chunks = [
        msgs_content[
            i*BATCH_INSERT:
            (i*BATCH_INSERT)+BATCH_INSERT
        ]
        for i
        in range(num_msgs // BATCH_INSERT + 1)]

Exceto pela criação dos chunks, a estrutura do consumidor fica praticamente igual às demais implementações.

try:

    count = 0

    while count < num_records:
    
        msgs = consumer.consume(BATCH_SIZE, timeout=1.0)
        num_msgs = len(msgs)

        if num_msgs == 0: continue
    
        msgs_content = list(map(lambda x : x.value().decode(), msgs))

        if num_msgs <= BATCH_INSERT:
            chunks = [msgs_content]
        else:
            chunks = [
                msgs_content[
                    i*BATCH_INSERT:
                    (i*BATCH_INSERT)+BATCH_INSERT
                ]
                for i
                in range(num_msgs // BATCH_INSERT + 1)]

        with Pool(processes=48,
                  initializer=set_global_conn) as pool:
            pool.map(save_message_batch_conn, chunks)

        count+=num_msgs
        consumer.commit()

finally:
    consumer.close()

Essa implementação com operações agrupadas, ficou ainda mais performática – usando BATCH_INSERT = 35, BATCH_SIZE = 10_000 e 48 processos – o processo todo demorou 40 segundos para inserir 1.000.000 de registros, o que antes demorava 47 minutos.

Capcioso

Eu não esperava que essas mudanças trouxesse ganhos tão expressivos, mas naturalmente esses resultados dependem de uma miríade de fatores. Para quem quiser executar os experimentos ou adaptar a solução para outro cenário, os códigos estão nesse repositório.

A ideia do post não era discutir esse problema em específico, mas as tomadas de decisão ao construir um consumidor Kafka. Os códigos desenvolvidos não são trabalhosos, nem mesmo complexos. Capciosos, talvez?

São códigos curtos, mas que demandam entendimento de concorrência e paralelismo, conceitos considerados avançados. O truque, de usar uma variável global não inicializada para criar um objeto por processo, é algo simples de implementar. Só que não é óbvio entender o porquê não se pode serializar uma conexão com banco de dados, nem o porquê isso é necessário quando se trabalha com múltiplos processos.

Problemas de engenharia de dados têm essa característica, mas até pela natureza de lidar com grandes volumes, é comum esses gargalos óbvios sejam discutidos em materias introdutórios. No caso do Kafka, essa discussão existe, mas muita mais sobre a infra do broker (e.g partições, réplicas, ZooKeeper, storage).

Imagino que um dos motivos, para não existir tantas discussões sobre a arquitetura do consumo, seja a grande variedade de aplicações. As discussões mudariam completamente se, por exemplo, o problema consistisse de eventos dependentes entre si. Mesmo assim, enxergo que existem padrões a serem seguidos em praticamente qualquer cenário, como a ideia de micro-batch para não ter problemas de overflow e abrir possibilidades de otimização.

Espero ter conseguido passar a ideia, dos pontos de atenção a serem considerados ao desenhar um consumidor. É um post que acabou maior que o esperado, mas o diabo está nos detalhes quando se fala de Goethe Kafka, então achei importante expandir alguns tópicos para além do código e resultados.