Table Importer API

Learn how to create, pause, resume, and delete table import jobs, and update table ingestion parameters programmatically using the GraphQL API.

Use the table importer API to programmatically connect your data warehouse with the Arize platform.

For a brief overview of GraphQL itself, please consult our introduction.

Creating Import Jobs

To create an import job: you must first configure the necessary permissions and access for Snowflake, Databricks, or BigQuery.

Import jobs are localized to a Space. To create an import job, retrieve your space ID from the platform URL:

/organizations/:organization_id/spaces/:space_id.

Import Job Tips:

  • Create a reusable mutation to easily configure multiple table import jobs

  • Validate your table import job before you actually create the job with thedryRun parameter when creating the import job mutation

    • dryRun(boolean): If true, Arize will attempt the import job with no written changes. Use the validationResult to check validation status

mutation dryRunTableImportJob($input: CreateTableImportJobInput!) {
  createTableImportJob(input: $input) {
    validationResult {
      validationStatus
      error {
        message
      }
    }
  }
}

variables:

{
  "input": {
    "spaceId": "space_id",
    "modelName": "My Test Model",
    "modelType": "score_categorical",
    "modelEnvironmentName": "production",
    "tableStore": "BigQuery",
    "bigQueryTableConfig": {
      "projectId": "test-project-id",
      "dataset": "test-dataset",
      "tableName": "test-table"
    },
    "schema": {
      "predictionId": "prediction_id",
      "predictionLabel": "prediction",
      "timestamp": "prediction_ts",
      "features": "feature_",
      "actualLabel": "actual",
      "version": "model_version",
      "tags": "tag_",
      "changeTimestamp": "partition_timestamp"
    },
    "dryRun": true
  }
}

Alternatively, you may pass in the inputs directly:

mutation {
  createTableImportJob(input: {
    spaceId: "space_id",
    modelName: "My Test Model",
    modelType: score_categorical,
    modelEnvironmentName: production,
    tableStore: BigQuery,
    bigQueryTableConfig: {
      projectId: "test-project-id",
      dataset: "test-dataset",
      tableName: "test-table"
    },
    schema: {
      predictionId: "prediction_id",
      predictionLabel: "prediction",
      timestamp: "prediction_ts",
      features: "feature_",
      actualLabel: "actual",
      version: "model_version",
      tags: "tag_",
      changeTimestamp: "partition_timestamp"
    },
    dryRun: true
  }){
    validationResult {
      validationStatus
      error {
        message
      }
    }
  }
}

The variables in the example above are for one mapping with BigQuery.

Learn how to map your data with each warehouse for Snowflake, Databricks, and BigQuery. Use the GraphQL docs here for a list of all the available fields for a query/mutation.

Querying for Import Jobs

You can query for table import jobs using a Space node.

query {
  node(id: "space_id") {
    ... on Space {
      tableJobs(first: 50) {
        edges {
          node {
            id
            modelName
            jobStatus
	    totalQueriesFailedCount
            totalQueriesSuccessCount
          }
        }
      }
    }
  }
}

If you have a large number of import jobs, use pagination for the complete list to view or use in your queries/mutations.

Query for fields directly from a specific import job using the TableImportJob node and ID.

To view the queries for a given import job, use the TableImportJobQueryConnection that returns the total number of queries and information on the individual queries.

query {
  node(id: "ID") {
    ... on TableImportJob {
      queries(first: 10) {
        totalCount
        edges{
          node {
            queryId
            createdAt
            recordsProcessedCount
            windowStartTimestamp
      	    windowEndTimestamp
          }
        }
      }
    }
  }
}

Pausing Import Jobs

Pause an existing table import job with the pauseTableImportJob mutation and jobID.

mutation {
  pauseTableImportJob(input: {jobId: "ID"}) {
    tableImportJob {
      jobStatus
    }
  }
}

Resuming Import Jobs

Resume a paused import job by using the startTableImportJob mutation and job ID.

mutation {
  startTableImportJob(input: {jobId: "ID"}) {
    tableImportJob {
      jobStatus
    }
  }
}

Event-Based Table Import Jobs

Available for Snowflake only.

If you are connecting to a warehouse or table for the first time, please complete the one-time permissions setup. For assistance, contact support@arize.com.

Step 1: Create Table Job

To enable event-driven ingestion, set up the table connection and initialize the job by running the following mutation:

mutation CreateTableJob {
 createTriggeredOngoingTableImportJob(input: {
  spaceId: "space_id",  
  modelName: "model-name", 
  modelType: score_categorical, 
  modelEnvironmentName: production,
  tableStore: Snowflake,
  snowflakeTableConfig: {
    accountID: "snowflake-account-id",
    schema:"snowflake-schema",
    database: "database-name",
    tableName:"table-name"
  },
  schema: {
    predictionId: "PRED_ID",
    predictionLabel: "PRED_LABEL"
    timestamp:"TIMESTAMP",
    featuresList: [],
    tagsList: [],
    changeTimestamp: "CHANGE_TIMESTAMP",
  }
}) {
  tableImportJob{
      jobId
      jobStatus
    }
    space{
      id
    }
    validationResult{
      error {
        code
        message
      }
      validationStatus
    }
}
}

Step 2: Query for Table ID

Once the job is created, query for the jobId by running the following query:

query queryTableJobId {
  node(id: "spaceId") {
    ... on Space {
      tableJobs(first: 1){
        edges{
          node{
            id
            jobId
          }
        }
      }
    }
  }
}

Step 3: Trigger a query

After the job is created, trigger a query for data within the specified time range using the following mutation:

mutation TriggerTableRun {
  createTriggeredOngoingTableRun(input: {
    jobId:"<ID from above query>",
    queryStart: "2024-08-20T01:00:00Z", #desired start time for query
    queryEnd: "2024-08-20T23:00:00Z", #desired end time for query
  }) {
    clientMutationId
  }
}

Deleting Import Jobs

Delete an import job by using the deleteTableImportJob mutation and job ID.

mutation {
  deleteTableImportJob(input: {jobId: "ID"}) {
    tableImportJob {
      jobStatus
    }
  }
}

Updating Table Ingestion Parameters

Update table ingestion parameters for a job using the updateTableIngestionParameters mutation and job ID.

mutation {
  updateTableIngestionParameters(input: {
    jobId: "job_id",
    tableIngestionParameters: {
      rowLimit: 50000,
      refreshIntervalMinutes: 60,
      queryWindowSizeHours: 24
    }
  }) {
    tableImportJob {
      id
    }
  }
}

Updating Table Ingestion Schema

Update the schema used for a table import job using the updateTableImportJob mutation and job ID.

mutation updateTableImportJob(
  $jobId: ID!, 
  $schema: TableImportSchemaInputType!,
  $tableIngestionParameters: TableIngestionParametersInputType!
) {
  updateTableImportJob(input: {
    jobId: $jobId,
    schema: $schema,
    tableIngestionParameters: $tableIngestionParameters
  }) {
   tableImportJob {
      id
    } 
  }
}

Last updated

Copyright © 2023 Arize AI, Inc