Relevant Kafka messages consist of a model’s input features, its relevant metadata (referred to as tags in the Arize platform), and the predictions.
Example
In the example below, Arize consumes a microbatch of up to 100,000 events or up to 10 seconds. This is a configuration that can be adjusted based on your use case.
Arize consumes these events, deserialize them, and batch them together prior to sending them over the wire into the Arize platform. This micro-batching amortizes any round-trip overhead.
Note that the automatic partition offset commits are disabled and manually commit the offsets after data has been persisted at the Arize edge. This ensures no data loss in the case of service disruption in any step of the process.
Kafka consumers can be set to consume all partitions on a topic, or for larger clusters can be set to consume specific partitions – allowing for parallelization of the consumer and further increasing throughput for truly large data processing, ensuring data is always ingested in real time and minimizing any potential latency.
from kafka import KafkaConsumer, OffsetAndMetadataimport jsonimport pandas as pdfrom arize.pandas.logger import Client, Schemafrom arize.utils.types import ModelTypes, Environments# list the Broker’s endpoints where the consumer will be listening toBROKER_LIST = ["localhost: 9092"]TOPIC ="arize - inference - stream"deser =lambdax: json.loads(x.decode("utf - 8"))# disable enable_auto_commit to ensure data is fully persisted prior to moving on to the next transactionconsumer =KafkaConsumer( bootstrap_servers=BROKER_LIST, value_deserializer=deser, group_id="Arize - Consumer", enable_auto_commit=False,)consumer.subscribe([TOPIC])# instantiate the Arize Client with your Space Credentialsarize_client =Client( space_id="ARIZE_SPACE_ID", api_key="ARIZE_API_KEY",)whileTrue: record = consumer.poll( timeout_ms=10_000, max_records=100_000, )for k, v in record.items():# Set the messages into a pandas dataframe for fast streaming into Arize inferences = pd.DataFrame([row.value for row in v]) reserved_columns = ["prediction_label","prediction_score","prediction_id","prediction_ts", ]# Define the schema for your model schema =Schema( prediction_id_column_name="prediction_id", timestamp_column_name="prediction_ts", feature_column_names=inferences.drop(columns=reserved_columns).tolist(), prediction_label_column_name="prediction_label", prediction_score_column_name="prediction_score", )# Log inferences to Arize res = arize_client.log( dataframe=inferences, model_id="model_id", model_version="model_version", model_type=ModelTypes.SCORE_CATEGORICAL, environment=Environments.PRODUCTION, schema=schema, )# Once data is successfully sent to Arize, then we commit the consumer offsets. meta = consumer.partitions_for_topic(TOPIC) options ={record.partition:OffsetAndMetadata(record.offset +1, meta)} consumer.commit(options)