AWS Data Wrangler

33 - Amazon Neptune

Note: to be able to use SPARQL you must either install SPARQLWrapper or install AWS Data Wrangler with sparql extra:

!pip install awswrangler[sparql]


The first step to using AWS Data Wrangler with Amazon Neptune is to import the library and create a client connection.

Note: Connecting to Amazon Neptune requires that the application you are running has access to the Private VPC where Neptune is located. Without this access you will not be able to connect using AWS Data Wrangler.

import awswrangler as wr
import pandas as pd

url='<INSERT CLUSTER ENDPOINT>' # The Neptune Cluster endpoint
iam_enabled = False # Set to True/False based on the configuration of your cluster
neptune_port = 8182 # Set to the Neptune Cluster Port, Default is 8182
client = wr.neptune.connect(url, neptune_port, iam_enabled=iam_enabled)

Return the status of the cluster

Retrieve Data from Neptune using AWS Data Wrangler

AWS Data Wrangler supports querying Amazon Neptune using TinkerPop Gremlin and openCypher for property graph data or SPARQL for RDF data.


query = "g.E().project('source', 'target').by(outV().id()).by(inV().id()).limit(5)"
df = wr.neptune.execute_gremlin(client, query)


query = "SELECT ?s ?o WHERE { ?s ?p ?o .} LIMIT 5"
df = wr.neptune.execute_sparql(client, query)


query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)

Saving Data using AWS Data Wrangler

AWS Data Wrangler supports saving Pandas DataFrames into Amazon Neptune using either a property graph or RDF data model.

Property Graph

If writing to a property graph then DataFrames for vertices and edges must be written separately. DataFrames for vertices must have a ~label column with the label and a ~id column for the vertex id.

If the ~id column does not exist, the specified id does not exists, or is empty then a new vertex will be added.

If no ~label column exists then writing to the graph will be treated as an update of the element with the specified ~id value.

DataFrames for edges must have a ~id, ~label, ~to, and ~from column. If the ~id column does not exist the specified id does not exists, or is empty then a new edge will be added. If no ~label, ~to, or ~from column exists an exception will be thrown.

Add Vertices/Nodes

import uuid
import random
import string
def _create_dummy_vertex():
    data = dict()
    data["~id"] = uuid.uuid4()
    data["~label"] = "foo"
    data["int"] = random.randint(0, 1000)
    data["str"] = "".join(random.choice(string.ascii_lowercase) for i in range(10))
    data["list"] = [random.randint(0, 1000), random.randint(0, 1000)]
    return data

data = [_create_dummy_vertex(), _create_dummy_vertex(), _create_dummy_vertex()]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
query = f"MATCH (s) WHERE id(s)='{data[0]['~id']}' RETURN s"
df = wr.neptune.execute_opencypher(client, query)

Add Edges

import uuid
import random
import string
def _create_dummy_edge():
    data = dict()
    data["~id"] = uuid.uuid4()
    data["~label"] = "bar"
    data["~to"] = uuid.uuid4()
    data["~from"] = uuid.uuid4()
    data["int"] = random.randint(0, 1000)
    data["str"] = "".join(random.choice(string.ascii_lowercase) for i in range(10))
    return data

data = [_create_dummy_edge(), _create_dummy_edge(), _create_dummy_edge()]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
query = f"MATCH (s)-[r]->(d) WHERE id(r)='{data[0]['~id']}' RETURN r"
df = wr.neptune.execute_opencypher(client, query)

Update Existing Nodes

wr.neptune.execute_gremlin(client, f"g.addV().property(, '{str(idval)}')")
query = f"MATCH (s) WHERE id(s)='{idval}' RETURN s"
df = wr.neptune.execute_opencypher(client, query)
data = [{"~id": idval, "age": 50}]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
df = wr.neptune.execute_opencypher(client, query)

Setting cardinality based on the header

If you would like to save data using single cardinality then you can postfix (single) to the column header and set use_header_cardinality=True (default). e.g. A column named name(single) will save the name property as single cardinality. You can disable this by setting by setting use_header_cardinality=False.

data = [_create_dummy_vertex()]
df = pd.DataFrame(data)
# Adding (single) to the column name in the DataFrame will cause it to write that property as `single` cardinality
df.rename(columns={"int": "int(single)"}, inplace=True)
res = wr.neptune.to_property_graph(client, df, use_header_cardinality=True)

# This can be disabled by setting `use_header_cardinality = False`
df.rename(columns={"int": "int(single)"}, inplace=True)
res = wr.neptune.to_property_graph(client, df, use_header_cardinality=False)


The DataFrame must consist of triples with column names for the subject, predicate, and object specified. If none are provided than s, p, and o are the default.

If you want to add data into a named graph then you will also need the graph column, default is g.

Write Triples

def _create_dummy_triple():
    data = dict()
    data["s"] = "foo"
    data["p"] = uuid.uuid4()
    data["o"] = random.randint(0, 1000)
    return data

data = [_create_dummy_triple(), _create_dummy_triple(), _create_dummy_triple()]
df = pd.DataFrame(data)
res = wr.neptune.to_rdf_graph(client, df)
query = "SELECT ?o WHERE { <foo> <" + str(data[0]['p']) + "> ?o .}"
df = wr.neptune.execute_sparql(client, query)

Write Quads

def _create_dummy_quad():
    data = _create_dummy_triple()
    data["g"] = "bar"
    return data

data = [_create_dummy_quad(), _create_dummy_quad(), _create_dummy_quad()]
df = pd.DataFrame(data)
res = wr.neptune.to_rdf_graph(client, df)
query = "SELECT ?o WHERE { <foo> <" + str(data[0]['p']) + "> ?o .}"
df = wr.neptune.execute_sparql(client, query)

Flatten DataFrames

One of the complexities of working with a row/columns paradigm, such as Pandas, with graph results set is that it is very common for graph results to return complex and nested objects. To help simplify using the results returned from a graph within a more tabular format we have added a method to flatten the returned Pandas DataFrame.

Flattening the DataFrame

client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n) RETURN n LIMIT 1"
df = wr.neptune.execute_opencypher(client, query)

Removing the prefixing of the parent column name

df_new=wr.neptune.flatten_nested_df(df, include_prefix=False)

Specifying the column header seperator

df_new=wr.neptune.flatten_nested_df(df, seperator='|')

Putting it into a workflow

pip install igraph networkx

Running PageRank using NetworkX

import networkx as nx

# Retrieve Data from neptune
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100"
df = wr.neptune.execute_opencypher(client, query)

# Run PageRank
G=nx.from_pandas_edgelist(df, edge_attr=True)
pg = nx.pagerank(G)

# Save values back into Neptune
for k in pg.keys():
    rows.append({'~id': k, 'pageRank_nx(single)': pg[k]})
pg_df=pd.DataFrame(rows, columns=['~id','pageRank_nx(single)'])
res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)

# Retrieve newly saved data
query = "MATCH (n:airport) WHERE n.pageRank_nx IS NOT NULL RETURN n.code, n.pageRank_nx ORDER BY n.pageRank_nx DESC LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)

Running PageRank using iGraph

import igraph as ig

# Retrieve Data from neptune
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100"
df = wr.neptune.execute_opencypher(client, query)

# Run PageRank
g = ig.Graph.TupleList(df.itertuples(index=False), directed=True, weights=False)
pg = g.pagerank()

# Save values back into Neptune
for idx, v in enumerate(g.vs):
    rows.append({'~id': v['name'], 'pageRank_ig(single)': pg[idx]})
pg_df=pd.DataFrame(rows, columns=['~id','pageRank_ig(single)'])
res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)

# Retrieve newly saved data
query = "MATCH (n:airport) WHERE n.pageRank_ig IS NOT NULL RETURN n.code, n.pageRank_ig ORDER BY n.pageRank_ig DESC LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)