Intégrer à OpenLineage

Ce document explique comment intégrer OpenLineage à Knowledge Catalog (anciennement Dataplex Universal Catalog) pour visualiser les données de traçabilité de différents systèmes.

Présentation

OpenLineage est une plate-forme ouverte permettant de collecter et d'analyser des informations sur la traçabilité des données. OpenLineage utilise une norme ouverte pour les données de traçabilité. Il capture les événements de traçabilité à partir des composants du pipeline de données qui utilisent une API OpenLineage pour générer des rapports sur les exécutions, les jobs et les ensembles de données.

Grâce à l'API Data Lineage, vous pouvez importer des événements OpenLineage pour les afficher dans l'interface Web Knowledge Catalog, à côté des informations de traçabilité des servicesGoogle Cloud , tels que BigQuery, Managed Service pour Apache Airflow, Cloud Data Fusion et Managed Service pour Apache Spark.

Pour importer des événements OpenLineage qui utilisent la spécification OpenLineage, utilisez la méthode d'API REST ProcessOpenLineageRunEvent et mappez les facettes OpenLineage aux attributs de l'API Data Lineage.

Limites

  • L'API Data Lineage est compatible avec la version majeure 1 d'OpenLineage.

  • Le point de terminaison de l'API Data Lineage ProcessOpenLineageRunEvent sert uniquement de consommateur de messages OpenLineage, et non de producteur. L'API vous permet d'envoyer des informations de traçabilité générées par n'importe quel outil ou système compatible avec OpenLineage dans Knowledge Catalog. Certains services Google Cloud , tels que Managed Service pour Apache Spark et Managed Airflow, incluent des producteurs OpenLineage intégrés qui peuvent envoyer des événements à ce point de terminaison, ce qui automatise la capture de la traçabilité à partir de ces services.

  • L'API Data Lineage n'est pas compatible avec les éléments suivants :

    • Toute version ultérieure d'OpenLineage avec des modifications du format de message
    • DatasetEvent
    • JobEvent
  • La taille maximale d'un message est de 5 Mo.

  • La longueur de chaque nom complet dans les entrées et les sorties est limitée à 4 000 caractères.

  • Les liens sont regroupés par événements, avec un maximum de 100 liens par événement. Le nombre maximal de liens au niveau du tableau est de 1 000. Si un message contient plus de 1 500 liens au niveau des colonnes, les informations au niveau des colonnes sont ignorées.

  • Knowledge Catalog affiche un graphique de traçabilité pour chaque exécution de job, qui indique les entrées et les sorties des événements de traçabilité. Il n'est pas compatible avec les processus de niveau inférieur tels que les étapes Spark.

Mappage OpenLineage

Pour en savoir plus sur le mappage OpenLineage, consultez Mappage OpenLineage.

Importer un événement OpenLineage

Si vous n'avez pas encore configuré OpenLineage, consultez Premiers pas.

Pour importer un événement OpenLineage dans Knowledge Catalog, appelez la méthode d'API ProcessOpenLineageRunEvent.

C#

C#

Avant d'essayer cet exemple, suivez les instructions de configuration pour C# du guide de démarrage rapide de Knowledge Catalog à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Knowledge Catalog pour C#.

Pour vous authentifier auprès de Knowledge Catalog, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

using Google.Cloud.DataCatalog.Lineage.V1;
using Google.Protobuf.WellKnownTypes;

public sealed partial class GeneratedLineageClientSnippets
{
    /// <summary>Snippet for ProcessOpenLineageRunEvent</summary>
    /// <remarks>
    /// This snippet has been automatically generated and should be regarded as a code template only.
    /// It will require modifications to work:
    /// - It may require correct/in-range values for request initialization.
    /// - It may require specifying regional endpoints when creating the service client as shown in
    ///   https://cloud.google.com/dotnet/docs/reference/help/client-configuration#endpoint.
    /// </remarks>
    public void ProcessOpenLineageRunEventRequestObject()
    {
        // Create client
        LineageClient lineageClient = LineageClient.Create();
        // Initialize request argument(s)
        ProcessOpenLineageRunEventRequest request = new ProcessOpenLineageRunEventRequest
        {
            Parent = "",
            OpenLineage = new Struct(),
        };
        // Make the request
        ProcessOpenLineageRunEventResponse response = lineageClient.ProcessOpenLineageRunEvent(request);
    }
}

Go

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Knowledge Catalog à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Knowledge Catalog pour Go.

Pour vous authentifier auprès de Knowledge Catalog, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


//go:build examples

package main

import (
	"context"

	lineage "cloud.google.com/go/datacatalog/lineage/apiv1"
	lineagepb "cloud.google.com/go/datacatalog/lineage/apiv1/lineagepb"
)

func main() {
	ctx := context.Background()
	// This snippet has been automatically generated and should be regarded as a code template only.
	// It will require modifications to work:
	// - It may require correct/in-range values for request initialization.
	// - It may require specifying regional endpoints when creating the service client as shown in:
	//   https://pkg.go.dev/cloud.google.com/go#hdr-Client_Options
	c, err := lineage.NewClient(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	defer c.Close()

	req := &lineagepb.ProcessOpenLineageRunEventRequest{
		// TODO: Fill request struct fields.
		// See https://pkg.go.dev/cloud.google.com/go/datacatalog/lineage/apiv1/lineagepb#ProcessOpenLineageRunEventRequest.
	}
	resp, err := c.ProcessOpenLineageRunEvent(ctx, req)
	if err != nil {
		// TODO: Handle error.
	}
	// TODO: Use resp.
	_ = resp
}

Java

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Knowledge Catalog à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Knowledge Catalog pour Java.

Pour vous authentifier auprès de Knowledge Catalog, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.cloud.datacatalog.lineage.v1.LineageClient;
import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventRequest;
import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventResponse;
import com.google.protobuf.Struct;

public class SyncProcessOpenLineageRunEvent {

  public static void main(String[] args) throws Exception {
    syncProcessOpenLineageRunEvent();
  }

  public static void syncProcessOpenLineageRunEvent() throws Exception {
    // This snippet has been automatically generated and should be regarded as a code template only.
    // It will require modifications to work:
    // - It may require correct/in-range values for request initialization.
    // - It may require specifying regional endpoints when creating the service client as shown in
    // https://cloud.google.com/java/docs/setup#configure_endpoints_for_the_client_library
    try (LineageClient lineageClient = LineageClient.create()) {
      ProcessOpenLineageRunEventRequest request =
          ProcessOpenLineageRunEventRequest.newBuilder()
              .setParent("parent-995424086")
              .setOpenLineage(Struct.newBuilder().build())
              .setRequestId("requestId693933066")
              .build();
      ProcessOpenLineageRunEventResponse response =
          lineageClient.processOpenLineageRunEvent(request);
    }
  }
}

Python

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Knowledge Catalog à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Knowledge Catalog pour Python.

Pour vous authentifier auprès de Knowledge Catalog, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
#   client as shown in:
#   https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import datacatalog_lineage_v1


def sample_process_open_lineage_run_event():
    # Create a client
    client = datacatalog_lineage_v1.LineageClient()

    # Initialize request argument(s)
    request = datacatalog_lineage_v1.ProcessOpenLineageRunEventRequest(
        parent="parent_value",
    )

    # Make the request
    response = client.process_open_lineage_run_event(request=request)

    # Handle the response
    print(response)

Ruby

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration pour Ruby du guide de démarrage rapide de Knowledge Catalog à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Knowledge Catalog pour Ruby.

Pour vous authentifier auprès de Knowledge Catalog, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

require "google/cloud/data_catalog/lineage/v1"

##
# Snippet for the process_open_lineage_run_event call in the Lineage service
#
# This snippet has been automatically generated and should be regarded as a code
# template only. It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in https://cloud.google.com/ruby/docs/reference.
#
# This is an auto-generated example demonstrating basic usage of
# Google::Cloud::DataCatalog::Lineage::V1::Lineage::Client#process_open_lineage_run_event.
#
def process_open_lineage_run_event
  # Create a client object. The client can be reused for multiple calls.
  client = Google::Cloud::DataCatalog::Lineage::V1::Lineage::Client.new

  # Create a request. To set request fields, pass in keyword arguments.
  request = Google::Cloud::DataCatalog::Lineage::V1::ProcessOpenLineageRunEventRequest.new

  # Call the process_open_lineage_run_event method.
  result = client.process_open_lineage_run_event request

  # The returned object is of type Google::Cloud::DataCatalog::Lineage::V1::ProcessOpenLineageRunEventResponse.
  p result
end

REST

Pour importer un événement OpenLineage, utilisez la méthode processOpenLineageRunEvent.

Avant d'utiliser les données de requête, effectuez les remplacements suivants :

  • PROJECT_ID : ID de votre projet Google Cloud .
  • LOCATION_ID : emplacement Google Cloud , par exemple us-central1.

Méthode HTTP et URL :

POST https://datalineage.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID:processOpenLineageRunEvent

Corps JSON de la requête :

{
  "eventTime": "2023-04-04T13:21:16.098Z",
  "eventType": "COMPLETE",
  "inputs": [
    {
      "name": "somename",
      "namespace": "customnamespace"
    }
  ],
  "job": {
    "name": "somename",
    "namespace": "customnamespace"
  },
  "outputs": [
    {
      "name": "somename",
      "namespace": "customnamespace"
    }
  ],
  "producer": "someproducer",
  "run": {
    "runId": "somerunid"
  },
  "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
}

Pour envoyer votre requête, développez l'une des options suivantes :

Vous devriez recevoir une réponse JSON de ce type :

{
  "process": "projects/my-project/locations/us-central1/processes/my-process",
  "run": "projects/my-project/locations/us-central1/processes/my-process/runs/my-run",
  "lineageEvents": [
    "projects/my-project/locations/us-central1/processes/my-process/runs/my-run/lineageEvents/my-lineage-event"
  ]
}

Outils pour envoyer des messages OpenLineage

Pour simplifier l'envoi d'événements à l'API Data Lineage, vous pouvez utiliser différents outils et bibliothèques :

  • Bibliothèque de production Java Google Cloud : Google fournit une bibliothèque Java Open Source pour vous aider à créer et à envoyer des événements OpenLineage à l'API Data Lineage. Pour en savoir plus, consultez l'article de blog La bibliothèque de production Java pour Data Lineage est désormais Open Source. La bibliothèque est disponible sur GitHub et Maven.
  • Transport OpenLineage GCP : un transport GcpLineage dédié est disponible pour les producteurs OpenLineage basés sur Java. Il simplifie l'intégration à l'API Data Lineage en minimisant le code nécessaire pour envoyer des événements à l'API Data Lineage. GcpLineageTransport peut être configuré comme récepteur d'événements pour tout producteur OpenLineage existant, tel qu'Airflow, Spark et Flink. Pour en savoir plus et obtenir des exemples, consultez GcpLineage.

Analyser les informations d'OpenLineage

Pour analyser les événements OpenLineage importés, consultez Afficher les graphiques de traçabilité dans l'UI Knowledge Catalog.

Données stockées

L'API Data Lineage ne stocke pas toutes les données de facettes des messages OpenLineage. L'API Data Lineage stocke les champs de facette suivants :

  • spark_version
    • openlineage-spark-version
    • spark-version
  • toutes les spark.logicalPlan.*
  • environment-properties (facette de lignée personnalisée Google Cloud )
    • origin.sourcetype et origin.name
    • spark.app.id
    • spark.app.name
    • spark.batch.id
    • spark.batch.uuid
    • spark.cluster.name
    • spark.cluster.region
    • spark.job.id
    • spark.job.uuid
    • spark.project.id
    • spark.query.node.name
    • spark.session.id
    • spark.session.uuid

L'API Data Lineage stocke les informations suivantes :

  • eventTime
  • run.runId
  • job.namespace
  • job.name

Étapes suivantes