But First, Semantics: Upsert versus Patch
But First, Semantics: Upsert versus Patch
Why Would You Use Patch
By default, most of the SDK tutorials and API-s involve applying full upserts at the aspect level. This means that typically, when you want to change one field within an aspect without modifying others, you need to do a read-modify-write to not overwrite existing fields. To support these scenarios, DataHub supports PATCH based operations so that targeted changes to single fields or values within arrays of fields are possible without impacting other existing metadata.
Currently, PATCH support is only available for a selected set of aspects, so before pinning your hopes on using PATCH as a way to make modifications to aspect values, confirm whether your aspect supports PATCH semantics. The complete list of Aspects that are supported are maintained here. In the near future, we do have plans to automatically support PATCH semantics for aspects by default.
How To Use Patch
Examples for using Patch are sprinkled throughout the API guides. Here's how to find the appropriate classes for the language for your choice.
- Java SDK
- Python SDK
The Java Patch builders are aspect-oriented and located in the datahub-client module under the datahub.client.patch
namespace.
Here are a few illustrative examples using the Java Patch builders:
Add Custom Properties
# Inlined from /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/DatasetCustomPropertiesAdd.java
package io.datahubproject.examples;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
@Slf4j
class DatasetCustomPropertiesAdd {
private DatasetCustomPropertiesAdd() {}
/**
* Adds properties to an existing custom properties aspect without affecting any existing
* properties
*
* @param args
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
MetadataChangeProposal datasetPropertiesProposal =
new DatasetPropertiesPatchBuilder()
.urn(UrnUtils.toDatasetUrn("hive", "fct_users_deleted", "PROD"))
.addCustomProperty("cluster_name", "datahubproject.acryl.io")
.addCustomProperty("retention_time", "2 years")
.build();
String token = "";
RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:8080").token(token));
try {
Future<MetadataWriteResponse> response = emitter.emit(datasetPropertiesProposal);
System.out.println(response.get().getResponseContent());
} catch (Exception e) {
log.error("Failed to emit metadata to DataHub", e);
throw e;
} finally {
emitter.close();
}
}
}
Add and Remove Custom Properties
# Inlined from /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/DatasetCustomPropertiesAddRemove.java
package io.datahubproject.examples;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
@Slf4j
class DatasetCustomPropertiesAddRemove {
private DatasetCustomPropertiesAddRemove() {}
/**
* Applies Add and Remove property operations on an existing custom properties aspect without
* affecting any other properties
*
* @param args
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
MetadataChangeProposal datasetPropertiesProposal =
new DatasetPropertiesPatchBuilder()
.urn(UrnUtils.toDatasetUrn("hive", "fct_users_deleted", "PROD"))
.addCustomProperty("cluster_name", "datahubproject.acryl.io")
.removeCustomProperty("retention_time")
.build();
String token = "";
RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:8080").token(token));
try {
Future<MetadataWriteResponse> response = emitter.emit(datasetPropertiesProposal);
System.out.println(response.get().getResponseContent());
} catch (Exception e) {
log.error("Failed to emit metadata to DataHub", e);
throw e;
} finally {
emitter.close();
}
}
}
Add Data Job Lineage
# Inlined from /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/DataJobLineageAdd.java
package io.datahubproject.examples;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.aspect.patch.builder.DataJobInputOutputPatchBuilder;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
@Slf4j
class DataJobLineageAdd {
private DataJobLineageAdd() {}
/**
* Adds lineage to an existing DataJob without affecting any lineage
*
* @param args
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
String token = "";
try (RestEmitter emitter =
RestEmitter.create(b -> b.server("http://localhost:8080").token(token))) {
MetadataChangeProposal dataJobIOPatch =
new DataJobInputOutputPatchBuilder()
.urn(
UrnUtils.getUrn(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)"))
.addInputDatasetEdge(
DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"))
.addOutputDatasetEdge(
DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleHiveDataset,PROD)"))
.addInputDatajobEdge(
DataJobUrn.createFromString(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_123)"))
.addInputDatasetField(
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD),user_id)"))
.addOutputDatasetField(
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"))
.build();
Future<MetadataWriteResponse> response = emitter.emit(dataJobIOPatch);
System.out.println(response.get().getResponseContent());
} catch (Exception e) {
log.error("Failed to emit metadata to DataHub", e);
throw new RuntimeException(e);
}
}
}
The Python Patch builders are entity-oriented and located in the metadata-ingestion module and located in the datahub.specific
module.
Here are a few illustrative examples using the Python Patch builders:
Add Properties to Dataset
# Inlined from /metadata-ingestion/examples/library/dataset_add_properties.py
import logging
from typing import Union
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
USE_REST_EMITTER = True
if USE_REST_EMITTER:
gms_endpoint = "http://localhost:8080"
return DataHubRestEmitter(gms_server=gms_endpoint)
else:
kafka_server = "localhost:9092"
schema_registry_url = "http://localhost:8081"
return DatahubKafkaEmitter(
config=KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap=kafka_server, schema_registry_url=schema_registry_url
)
)
)
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
with get_emitter() as emitter:
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_custom_property("cluster_name", "datahubproject.acryl.io")
.add_custom_property("retention_time", "2 years")
.build()
):
emitter.emit(patch_mcp)
log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
How Patch works
To understand how patching works, it's important to understand a bit about our models. Entities are comprised of Aspects which can be reasoned about as JSON representations of the object models. To be able to patch these we utilize JsonPatch. The components of a JSON Patch are the path, operation, and value.
Path
The JSON path refers to a value within the schema. This can be a single field or can be an entire object reference depending on what the path is. For our patches we are primarily targeting single fields or even single array elements within a field. To be able to target array elements by id, we go through a translation process of the schema to transform arrays into maps. This allows a path to reference a particular array element by key rather than by index, for example a specific tag urn being added to a dataset. This is important to note that for some fields in our schema that are arrays which do not necessarily restrict uniqueness, this puts a uniqueness constraint on the key. The key for objects stored in arrays is determined manually by examining the schema and a long term goal is to make these keys annotation driven to reduce the amount of code needed to support additional aspects to be patched. There is a generic patch endpoint, but it requires any array field keys to be specified at request time, putting a lot of burden on the API user.
Examples
A patch path for targeting an upstream dataset:
/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)
Breakdown:
/upstreams
-> References the upstreams field of the UpstreamLineage aspect, this is an array of Upstream objects where the key is the Urn/urn:...
-> The dataset to be targeted by the operation
A patch path for targeting a fine grained lineage upstream:
/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),foo)/urn:li:query:queryId/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD),bar)
Breakdown:
/fineGrainedLineages
-> References the fineGrainedLineages field on an UpstreamLineage, this is an array of FineGrainedLineage objects keyed on transformOperation, downstream urn, and query urn/TRANSFORM
-> transformOperation, one of the fields determining the key for a fineGrainedLineage/urn:li:schemaField:...
-> The downstream schemaField referenced in this schema, part of the key for a fineGrainedLineage/urn:li:query:...
-> The query urn this relationship was derived from, part of the key for a fineGrainedLineage/urn:li:schemaField:
-> The upstream urn that is being targeted by this patch operation
This showcases that in some cases the key for objects is simple, in others in can be complex to determine, but for our fully supported use cases we have SDK support on both the Java and Python side that will generate these patches for you as long as you supply the required method parameters. Path is generally the most complicated portion of a patch to reason about as it requires intimate knowledge of the schema and models.
Operation
Operation is a limited enum of a few supported types pulled directly from the JSON Patch spec. DataHub only supports ADD
and REMOVE
of these options
as the other patch operations do not currently have a use case within our system.
Add
Add is a bit of a misnomer for the JSON Patch spec, it is not an explicit add but an upsert/replace. If the path specified does not exist, it will be created, but if the path already exists the value will be replaced. Patch operations apply at a path level so it is possible to do full replaces of arrays or objects in the schema using adds, but generally the most useful use case for patch is to add elements to arrays without affecting the other elements as full upserts are supported by standard ingestion.
Remove
Remove operations require the path specified to be present, or an error will be thrown, otherwise they operate as one would expect. The specified path will be removed from the aspect.
Value
Value is the actual information that will be stored at a path. If the path references an object then this will include the JSON key value pairs for that object.
Examples
An example UpstreamLineage object value:
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my-folder/my-file.txt,PROD)",
"type": "TRANSFORMED"
}
For the previous path example (/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)
), this object would represent the UpstreamLineage object for that path.
This specifies the required fields to properly represent that object. Note: by modifying this path, you could reference a single field within the UpstreamLineage object itself, like so:
{
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)/type",
"op": "ADD",
"value": "VIEW"
}
Implementation details
Template Classes
Template classes are the mechanism that maps fields to their corresponding JSON paths. Since DataMaps are not true JSON, first we convert a RecordTemplate to a JSON String, perform any additional process to map array fields to their keys, apply the patch, and then convert the JSON object back to a RecordTemplate to work with the rest of the application.
The template classes we currently support can be found in the entity-registry
module. They are split up by aspect, with the GenericTemplate applying to any non-directly supported aspects.
The GenericTemplate allows for use cases that we have not gotten around to directly support yet, but puts more burden on the user to generate patches correctly.
The template classes are utilized in EntityServiceImpl
where a MCP is determined to be either a patch or standard upsert which then routes through to the stored templates registered on the EntityRegistry.
The core logical flow each Template runs through is set up in the Template
interface, with some more specific logic in the lower level interfaces for constructing/deconstructing array field keys.
Most of the complexity around these classes is knowledge of schema and JSON path traversals.
ArrayMergingTemplate & CompoundKeyTemplate
ArrayMergingTemplate
is utilized for any aspect which has array fields and may either be used directly or use CompoundKeyTemplate
. ArrayMergingTemplate
is the simpler one that can only be used directly for
single value keys. CompoundKeyTemplate
allows for support of multi-field keys. For more complex examples like FineGrainedLineage, further logic is needed to construct a key as it is not generalizable to other aspects, see UpstreamLineageTemplate
for full special case implementation.
PatchBuilders
There are patch builder SDK classes for constructing patches in both Java and Python. The Java patch builders all extend AbstractMultiFieldPatchBuilder
which sets up the
base functionality for patch builder subtypes. Each implementation of this abstract class is targeted at a particular aspect and contains specific field based update methods
for the most common use cases. On the Python side patch builders live in the src/specific/
directory and are organized by entity type.