Ask or search…
K
Links

Kafka

The Arize Pandas SDK can be utilized to consumes micro-batches of predictions to Arize to enable real time observability.

Overview

Relevant Kafka messages consist of a model’s input features, its relevant metadata (referred to as tags in the Arize platform), and the predictions.
Visualization of Arize fitting into a Kafka Pipeline

Example

In the example below, Arize consumes a microbatch of up to 100,000 events or up to 10 seconds. This is a configuration that can be adjusted based on your use case.
Arize consumes these events, deserialize them, and batch them together prior to sending them over the wire into the Arize platform. This micro-batching amortizes any round-trip overhead.
Note that the automatic partition offset commits are disabled and manually commit the offsets after data has been persisted at the Arize edge. This ensures no data loss in the case of service disruption in any step of the process.
Kafka consumers can be set to consume all partitions on a topic, or for larger clusters can be set to consume specific partitions – allowing for parallelization of the consumer and further increasing throughput for truly large data processing, ensuring data is always ingested in real time and minimizing any potential latency.
from kafka import KafkaConsumer, OffsetAndMetadata
import json
import pandas as pd
from arize.pandas.logger import Client, Schema
from arize.utils.types import ModelTypes, Environments
# list the Broker’s endpoints where the consumer will be listening to
BROKER_LIST = ["localhost: 9092"]
TOPIC = "arize - inference - stream"
deser = lambda x: json.loads(x.decode("utf - 8"))
# disable enable_auto_commit to ensure data is fully persisted prior to moving on to the next transaction
consumer = KafkaConsumer(
bootstrap_servers=BROKER_LIST,
value_deserializer=deser,
group_id="Arize - Consumer",
enable_auto_commit=False,
)
consumer.subscribe([TOPIC])
# instantiate the Arize Client with your Space Credentials
arize_client = Client(
space_key="ARIZE_SPACE_KEY",
api_key="ARIZE_API_KEY",
)
while True:
record = consumer.poll(
timeout_ms=10_000,
max_records=100_000,
)
for k, v in record.items():
# Set the messages into a pandas dataframe for fast streaming into Arize
inferences = pd.DataFrame([row.value for row in v])
reserved_columns = [
"prediction_label",
"prediction_score",
"prediction_id",
"prediction_ts",
]
# Define the schema for your model
schema = Schema(
prediction_id_column_name="prediction_id",
timestamp_column_name="prediction_ts",
feature_column_names=inferences.drop(columns=reserved_columns).tolist(),
prediction_label_column_name="prediction_label",
prediction_score_column_name="prediction_score",
)
# Log inferences to Arize
res = arize_client.log(
dataframe=inferences,
model_id="model_id",
model_version="model_version",
model_type=ModelTypes.SCORE_CATEGORICAL,
environment=Environments.PRODUCTION,
schema=schema,
)
# Once data is successfully sent to Arize, then we commit the consumer offsets.
meta = consumer.partitions_for_topic(TOPIC)
options = {record.partition: OffsetAndMetadata(record.offset + 1, meta)}
consumer.commit(options)
Questions? Email us at [email protected] or Slack us in the #arize-support channel
Last modified 11mo ago