My Challenges
I recently got a chance to take a look at Prefect. This is a tool that is in the same space as Apache Airflow. They both provide a way to manage and orchestrate data pipeline code.
In my job, I work as a Fraud Analyst, but my role is more of a cross between a data engineering role and a data analyst role - with my strong preference for the data engineering side of the job. I spend most days writing, maintaining, or modifying data integrations with Python code or SQL. My department is a Windows shop with a SQL server database for our departmental analysis and backend for some home-grown tools. We also interact with numerous other database technologies throughout the broader company. The code I write glues all of these pieces together, in addition to authoring some of the tools we deliver to our front-end users.
I give the above context because we need a way to schedule our code. We also need decent observability of the code and its executions. We currently are tieing everything together with Windows Scheduler for the Python code and SQL Server agent for the SQL. SQL Server agent is the most robust, but neither provides great observability and out-of-the-box monitoring. Also, when the schedulers stop working, which recently happened to one of them, it is hard to know anything is wrong until data is identified as missing. We never want to be notified by downstream analysts about our code being broken. We want to discover and fix it ourselves before they notice; before there is a production issue. Our current systems make that difficult.
Getting a Sample Data Pipeline Running
To address this, I began evaluating Prefect. With some simple changes to our Python code, this tool would enable us to make our code more observable and simplify how we identify exceptions and issues that need attention.
The tool has the concept of a flow, which at its core is the parent structure, and flows can have tasks, and though I didn’t try this in my testing, I think other flows or sub-flows subordinated to the task. This structure provides a view into your code as it runs and after it finishes to help you find performance issues or bugs.
The screenshot above shows a simple data pipeline that reads a joke from a joke API and inserts it into a SQL Server database. In this particular flow run, the procedure that writes to the database has failed. This is obviously a very simple example, but it isn’t hard to see how this could be valuable for a complex script with many subroutines or processes. Below this part of the screen, the entire log file from the execution of this flow will be presented - enabling troubleshooting right from the orchestration website.
Building code for this integration is very straightforward.
from prefect import flow, task, logging
from prefect.blocks.system import Secret
import requests
import json
import pyodbc
@task(name="write-joke-to-db")
def write_joke_to_db(joke : str, api_id: str) -> None:
server = '10.0.20.77'
database = 'flippy'
username = 'jokeuser'
password = Secret.load("sql-server-password")
connection_string = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password.get()};TrustServerCertificate=yes;"
# Establish the connection
connection = pyodbc.connect(connection_string)
connection.cursor().execute('INSERT INTO flippy.dbo.jokes (joke_text, api_id) values (?, ?)', joke, api_id)
connection.commit()
logging.get_logger().info('Joke written')
@task(name="get-a-joke")
def get_a_joke() -> tuple[str]:
result = requests.get('https://icanhazdadjoke.com/', headers={'Accept':'application/json'})
if result.status_code == 200:
parsed = json.loads(result.text)
joke = parsed['joke']
api_id = parsed['id']
logging.get_run_logger().info(f'Joke (local): {joke} - {api_id}')
return joke, api_id
else:
raise Exception(f'Failed to get joke from API status code = {result.status_code} result={result}')
@flow(name="get-jokes")
def get_jokes(num_jokes : int = 1):
for x in range(num_jokes):
joke, api_id = get_a_joke()
write_joke_to_db(joke, api_id)
if __name__ == "__main__":
get_jokes()
This is a very sloppy example of a pipeline I threw together to test the tool; It is not shown as an example of good code. You will note that we are adding some imports from prefect as well as adding decorators to identify the entry point of the flow and each task. Also, note the prefect block for secrets, which provides the database user’s password. There is also a reference to Prefect’s logging, though print statements can also be used and appear in the logs. Other than that, the code is the same as we would write if we just wanted to write a script to do this. It doesn’t include anything to persist the logs; it doesn’t include anything to schedule the job; it doesn’t include anything to notify us when there is a failure. Prefect provides all of this for a minimal cost of decorating our code.
If I were to write the same code with Windows scheduler as my orchestration engine, I would need to build out any notifications and log persistence myself. This may seem trivial, but it has to be done for each script - that is far from ideal and challenging to do consistently.
Deploying the Code
I deployed the code above by setting up a Windows share on my test server. I developed all of this on my hardware at home; before bringing the recommendation into the office, and used my Mac laptop to develop and deploy the code. Once I had the right libraries installed in my Python virtual environment, the task of deploying it was a simple command line:
~/dev/prefect_cloud_test via python v3.11.4
❯ prefect deployment build jokes.py:get_jokes --work-queue default --name get-joke-pipeline --storage-block smb/files --interval 120 -a
This command builds the YAML file that controls the deployment. It tells it where the flow starts, which work queue to deploy the job into, provides the flow with a name, and specifies the location (which is set up in the Prefect interface) to store the code for the job. The command also setups a schedule, in this case, every two minutes, and automatically deploys the YAML file to get the job running. To deploy changes, you need only rerun the same command.
This ability to completely manage code deployment from the command line is a nice feature. Another note about the deployment is that only the YAML file is going to the Prefect server - the actual code is delivered to whatever location you set up by your machine without ever being sent to the Prefect server.
Queues and Agents
The software utilizes an agent on any machine where the code can run. You can setup multiple groups of these agents and queues for different types of jobs or load balance jobs. The server software interacts with these agents and tells them where to find the code to run and facilitates the communication of the progress, and logs back to the server.
Server Types
Prefect offers both an open-source self-hosted version of the server and a cloud-hosted version. The cloud-hosted version provides free plans with limited functionality and paid plans. The advantage of the cloud-based plans includes the ability to secure the installation and simpler notifications, among other things.
I used both the open-source version and the cloud version in my testing. Unfortunately, due to the lack of authentication, I don’t think the open-source tool could be seriously considered by anyone in anything but the smallest of organizations unless you can fully isolate your data pipeline architecture from the rest of the network. But it would be fine in a smaller operation.
Next Steps
I have suggested that we look into the cloud-based version of this software to replace our current systems. I will post more about this tool if I can work with it more. But, so far, I am excited about the possibility of improving our observability and avoiding the repetitive process of writing notifications and logging code with each project.