Blog

Retrieving files from AWS Glue Job Bookmarks

19 Oct, 2023
Xebia Background Header Wave

AWS Glue is a service to implement ETL pipelines that can process large amounts of data. Glue has a feature called job bookmarks that keep track of which data is already processed and which data still needs to be processed by a job. For example, in the case of an S3 bucket as a data source, it keeps a list of what files are processed to prevent the re-processing of unchanged data in a bucket. In our case, we wanted to identify which partitions in S3 have been updated with new files so we can make sure to merge them and we don’t end up with a large amount of small S3 files in each partition as that will impact performance.

How to know which objects, if we take the example of S3, your Glue job is processing in a specific run? The first step is to create a Glue job and enable the bookmark feature. When the script invokes job.init, it initializes the job and retrieves the bookmark state. There are multiple elements that make the “state” for our Glue job. They are specific to each source, transformation, and sink in the script.

Let’s start by specifying our data source by creating a dynamic frame with a transformation_ctx,

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
glue_client = boto3.client("glue")
dyf_source_s3 = glueContext.create_dynamic_frame.from_options(
"s3",
{
"paths": ["YOUR_S3_BUCKET_PATH"],
},
format="parquet",
transformation_ctx="datasource", # specify transformation context
)

The transformation_ctx serves as the key to searching the bookmark state for a specific source in your script. By creating the dynamic frame and setting up our transformation_ctx, we are enabling Glue to set the bookmark state for this specific job. To be able to get the list of files that will be processed in this specific job, we need to get some additional information.

First, we’ll retrieve our job definition based on the job name.

job_name = args['JOB_NAME']
response = glue_client.get_job(JobName=job_name)

The response contains detailed information about the Glue job, among which is the temporary directory (TempDir). That is what we are looking for.

temp_dir = response['Job']['DefaultArguments']['--TempDir']

You can also point your script directly to the temporary directory S3 path as you can set that by passing a job argument. The list of files we are looking for is saved under the following path:

{temp_dir}/partitionlisting/{args['JOB_NAME']}/{job_run_id}

What we are missing currently is to retrieve the job_run_id. We can do that the following way:

job_run_id = args['JOB_RUN_ID']

After getting the job_run_id, we can now access the list of files that your job needs to process:

df = spark.read.json(f"{temp_dir}/partitionlisting/{args['JOB_NAME']}/{job_run_id}/")

Our complete job will be the following:

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
glue_client = boto3.client("glue")
dyf_source_s3 = glueContext.create_dynamic_frame.from_options(
"s3",
{
"paths": ["YOUR_S3_BUCKET_PATH"],
},
format="parquet",
transformation_ctx="datasource", # specify transformation context
)
job_name = args["JOB_NAME"]
response = glue_client.get_job(JobName=job_name)
TempDir = response["Job"]["DefaultArguments"]["--TempDir"]
job_run_id = args["JOB_RUN_ID"]
df = spark.read.json(f"{TempDir}/partitionlisting/{args['JOB_NAME']}/{job_run_id}/")
df.show()
job.commit()

Our data frame:

+--------------------+--------------------+
| files| path|
+--------------------+--------------------+
|[s3://FILE_PATH1...,| |
|s3://FILE_PATH1....]|s3://S3_BUCKET |
+--------------------+--------------------+

After getting this information, you can loop through the list of files and apply your logic! Happy Reading!

Salma Cherif
I am an AWS consulting engineer with a focus on data and modern data architectures on AWS, I help clients with their data platforms from architecting to implementation. I am passionate about helping customers break the technical barriers with their data in an innovative secure manner to scale and modernize their data architecture.
Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts