Kafka

The Arize Pandas SDK can be utilized to consumes micro-batches of predictions to Arize to enable real time observability.

Overview

Relevant Kafka messages consist of a model’s input features, its relevant metadata (referred to as tags in the Arize platform), and the predictions.

Example

In the example below, Arize consumes a microbatch of up to 100,000 events or up to 10 seconds. This is a configuration that can be adjusted based on your use case.

Arize consumes these events, deserialize them, and batch them together prior to sending them over the wire into the Arize platform. This micro-batching amortizes any round-trip overhead.

Note that the automatic partition offset commits are disabled and manually commit the offsets after data has been persisted at the Arize edge. This ensures no data loss in the case of service disruption in any step of the process.

Kafka consumers can be set to consume all partitions on a topic, or for larger clusters can be set to consume specific partitions – allowing for parallelization of the consumer and further increasing throughput for truly large data processing, ensuring data is always ingested in real time and minimizing any potential latency.

from kafka import KafkaConsumer, OffsetAndMetadata
import json
import pandas as pd

from arize.pandas.logger import Client, Schema
from arize.utils.types import ModelTypes, Environments

# list the Broker’s endpoints where the consumer will be listening to

BROKER_LIST = ["localhost: 9092"]
TOPIC = "arize - inference - stream"
deser = lambda x: json.loads(x.decode("utf - 8"))

# disable enable_auto_commit to ensure data is fully persisted prior to moving on to the next transaction

consumer = KafkaConsumer(
    bootstrap_servers=BROKER_LIST,
    value_deserializer=deser,
    group_id="Arize - Consumer",
    enable_auto_commit=False,
)

consumer.subscribe([TOPIC])

# instantiate the Arize Client with your Space Credentials

arize_client = Client(
    space_id="ARIZE_SPACE_ID",
    api_key="ARIZE_API_KEY",
)
while True:
    record = consumer.poll(
        timeout_ms=10_000,
        max_records=100_000,
    )
    for k, v in record.items():
        # Set the messages into a pandas dataframe for fast streaming into Arize
        inferences = pd.DataFrame([row.value for row in v])
        reserved_columns = [
            "prediction_label",
            "prediction_score",
            "prediction_id",
            "prediction_ts",
        ]

        # Define the schema for your model
        schema = Schema(
            prediction_id_column_name="prediction_id",
            timestamp_column_name="prediction_ts",
            feature_column_names=inferences.drop(columns=reserved_columns).tolist(),
            prediction_label_column_name="prediction_label",
            prediction_score_column_name="prediction_score",
        )

        # Log inferences to Arize
        res = arize_client.log(
            dataframe=inferences,
            model_id="model_id",
            model_version="model_version",
            model_type=ModelTypes.SCORE_CATEGORICAL,
            environment=Environments.PRODUCTION,
            schema=schema,
        )

        # Once data is successfully sent to Arize, then we commit the consumer offsets.
        meta = consumer.partitions_for_topic(TOPIC)
        options = {record.partition: OffsetAndMetadata(record.offset + 1, meta)}
        consumer.commit(options)

Questions? Email us at support@arize.com or Slack us in the #arize-support channel

Last updated

Copyright © 2023 Arize AI, Inc