Skip to content

Save data to external storage

Save as txt files

To save a Spatial DataFrame to some permanent storage such as Hive tables and HDFS, you can simply convert each geometry in the Geometry type column back to a plain String and save the plain DataFrame to wherever you want.

Use the following code to convert the Geometry column in a DataFrame back to a WKT string column:

SELECT ST_AsText(countyshape)
FROM polygondf

Then you can use any Spark writer to save this DataFrame.

df.write.format("YOUR_FORMAT").save("YOUR_PATH")

Note

SedonaSQL provides lots of functions to save the Geometry column, please read SedonaSQL API.

Save as GeoParquet

Sedona can directly save a DataFrame with the Geometry column as a GeoParquet file. You need to specify geoparquet as the write format. The Geometry type will be preserved in the GeoParquet file.

df.write.format("geoparquet").save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")

To maximize the performance of Sedona GeoParquet filter pushdown, we suggest that you sort the data by their geohash values (see ST_GeoHash) and then save as a GeoParquet file. An example is as follows:

SELECT col1, col2, geom, ST_GeoHash(geom, 5) as geohash
FROM spatialDf
ORDER BY geohash

Save as Single Line GeoJSON Features

Sedona can save a Spatial DataFrame to a single-line GeoJSON feature file.

Adapter.toSpatialRdd(df, "geometry").saveAsGeoJSON("YOUR/PATH.json")
from sedona.utils.adapter import Adapter

Adapter.toSpatialRdd(df, "geometry").saveAsGeoJSON("YOUR/PATH.json")

The structure of the generated file will be like this:

{"type":"Feature","geometry":{"type":"Point","coordinates":[102.0,0.5]},"properties":{"prop0":"value0"}}
{"type":"Feature","geometry":{"type":"LineString","coordinates":[[102.0,0.0],[103.0,1.0],[104.0,0.0],[105.0,1.0]]},"properties":{"prop0":"value1"}}
{"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[100.0,0.0],[101.0,0.0],[101.0,1.0],[100.0,1.0],[100.0,0.0]]]},"properties":{"prop0":"value2"}}

Save to PostGIS

Unfortunately, the Spark SQL JDBC data source doesn't support creating geometry types in PostGIS using the 'createTableColumnTypes' option. Only the Spark built-in types are recognized. This means that you'll need to manage your PostGIS schema separately from Spark. One way to do this is to create the table with the correct geometry column before writing data to it with Spark. Alternatively, you can write your data to the table using Spark and then manually alter the column to be a geometry type afterward.

Postgis uses EWKB to serialize geometries. If you convert your geometries to EWKB format in Sedona you don't have to do any additional conversion in Postgis.

Step 1: In PostGIS

my_postgis_db# create table my_table (id int8, geom geometry);

Step 2: In Spark

df.withColumn("geom", expr("ST_AsEWKB(geom)")
 .write.format("jdbc")
 .option("truncate","true") // Don't let Spark recreate the table.
 // Other options.
 .save()

Step 3 (optional): In PostGIS

If you didn't create the table before writing you can change the type afterward.

my_postgis_db# alter table my_table alter column geom type geometry;

Save to GeoPandas

Sedona DataFrame can be directly converted to a GeoPandas DataFrame.

import geopandas as gpd

df = spatialDf.toPandas()
gdf = gpd.GeoDataFrame(df, geometry="geometry")

You can then plot the GeoPandas DataFrame using many tools in the GeoPandas ecosystem.

gdf.plot(
    figsize=(10, 8),
    column="value",
    legend=True,
    cmap='YlOrBr',
    scheme='quantiles',
    edgecolor='lightgray'
)

Save to Snowflake

Sedona Dataframes can be exported to Snowflake tables for persistent storage.

In order to enable bi-directional communication between Spark and Snowflake, a map of configuration parameters must be passed as options to the sedona context object.

The configuration parameters include connection and context options. Details on the possible values of these options can be found here

The sedona context object also needs to be passed a SaveMode parameter using mode which specifies how to handle collisions with existing tables if any

val sfOptions = Map("sfUrl" -> snowflakeUrl, "sfUser" -> username, "sfPassword" -> password, "sfDatabase" -> database, "sfSchema" -> schema) // snowflakeUrl is https://<accountIdentifier>.snowflakecomputing.com
val dest_table_name = "<DESTINATION_TABLE_NAME>"
val save_mode = "append" //Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", dest_table_name)
.mode(saveMode=save_mode)
.save()
import java.util.HashMap;
HashMap<String, String> sfOptions = new HashMap<>();
sfOptions.put("sfUrl", snowflakeUrl); // snowflakeUrl is https://<accountIdentifier>.snowflakecomputing.com
sfOptions.put("sfUser", username);
sfOptions.put("sfPassword", password);
sfOptions.put("sfDatabase", database);
sfOptions.put("sfSchema", schema);
String dest_table_name = "<DESTINATION_TABLE_NAME>";
String save_mode = "append" //Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", dest_table_name)
.mode(save_mode)
.save()
sfOptions = {"sfUrl": snowflake_url, "sfUser": username, "sfPassword" : password, "sfDatabase": database, "sfSchema": schema} # snowflakeUrl is https://<accountIdentifier>.snowflakecomputing.com
dest_table_name = "<DESTINATION_TABLE_NAME>"
save_mode = "append" #Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_final.write.format(SNOWFLAKE_SOURCE_NAME)\
.options(**sfOptions)\
.option("dbtable", destination_table)\
.mode(saveMode=save_mode)\
.save()

Save to an AWS RDS PostGIS instance

Sedona dataframes can be saved to PostGIS tables hosted in an AWS RDS instance for persistent storage.

A map of configuration and context options must be passed to establish connection with the RDS instance.

If you're unable to establish connection with the RDS instance, double check if the instance is accessible by the server running this code. For more information on intra or inter VPC connection with the RDS instance, consult here.

The sedona context object also needs to be passed a SaveMode parameter using mode which specifies how to handle collisions with existing tables if any

val url = "<URL>"//jdbc:postgresql://ENDPOINT/DATABASE_NAME
val driver = "org.postgresql.Driver"
val user = "<USERNAME>"
val password = "<PASSWORD>"
val options = Map("url" -> url, "user" -> user, "password" -> password, "driver" -> driver)
val save_mode = "append" //Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format("jdbc")
        .options(options)
        .option("dbtable", dest_table_name)
        .mode(saveMode=save_mode)
        .save()
import java.util.HashMap;
HashMap<String, String> options = new HashMap<>();
options.put("url", url); // url is jdbc:postgresql://ENDPOINT/DATABASE_NAME
options.put("user", username);
options.put("password", password);
options.put("driver", "org.postgresql.Driver");
String dest_table_name = "<DESTINATION_TABLE_NAME>";
String save_mode = "append" //Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format("jdbc")
                    .options(options)
                    .option("dbtable", dest_table_name)
                    .mode(save_mode)
                    .save()
url = '<URL>' #jdbc:postgresql://ENDPOINT/DATABASE_NAME
driver = 'org.postgresql.Driver'
user = '<USERNAME>'
password = '<PASSWORD>'
options = {"url": url, "driver": driver, "user": user, "password": password}
dest_table_name = "<DESTINATION_TABLE_NAME>"
save_mode = "append" #Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_final.write.format(SNOWFLAKE_SOURCE_NAME)
    .options(**sfOptions)
    .option("dbtable", destination_table)
    .mode(saveMode=save_mode)
    .save()

Last update: October 17, 2023 01:49:40