Creating a Data Pipeline to Monitor Local Crime Trends

Editor
28 Min Read


about examining crime trends in your local area. You know that relevant data exists, and you have some basic analytical skills that you can use to analyze this data. However, this data is changing frequently, and you want to keep your analysis updated with the most recent crime incidents without repeating your analysis. How can we automate this process?

Well, if you’ve stumbled upon this article, you’re in luck! Together, we’ll walk through how to create a data pipeline to extract local police log data, and connect this to a visualization platform to examine local crime trends over time. For this article, we’ll extract data on incidents reported to the Cambridge (MA) Police Department (CPD), and then visualize this data as a dashboard in Metabase.

The final dashboard we’ll create to summarize recent and historical trends in CPD incidents.

Additionally, this article can serve as a general template for anybody looking to write ETL pipelines orchestrated in Prefect, and/or anybody who wants to connect Metabase to their data stores to create insightful analyses/reports.

Note: I have no affiliation with Metabase – we’ll simply use Metabase as an example platform to create our final dashboard. There are many other viable alternatives, which are described in this section.

Contents:


Background Knowledge

Before we dive into the pipeline, it’ll be helpful to review the following concepts, or keep these links as reference as you read. 


Data of Interest

The data we’ll be working with contains a collection of police log entries, where each entry is a single incident reported to/by the CPD. Each entry contains comprehensive information describing the incident, including but not limited to:

  • Date & time of the incident
  • Type of incident that occurred
  • The street where the incident took place
  • A plaintext description of what happened
A glimpse at the Cambridge Daily Police Log.

Check out the portal for more information about the data.

For monitoring crime trends in Cambridge, MA, creating a data pipeline to extract this data is appropriate, as the data is updated daily (according to their website). If the data was updated less frequently (e.g. annually), then creating a data pipeline to automate this process wouldn’t save us much effort. We could simply revisit the data portal at the end of each year, download the .csv, and complete our analysis. 

Now that we’ve found the appropriate dataset, let’s walk through the implementation.


ETL Pipeline

To go from raw CPD log data to a Metabase dashboard, our project will consist of the following major steps:

  • Extract the data by using its corresponding API.
  • Transforming it to prepare it for storage.
  • Loading it into a PostgreSQL database.
  • Visualizing the data in Metabase.

The data flow of our system will look like the following:

System data flow from data extraction to Metabase visualization.

Our pipeline follows an ETL workflow, which means that we’ll transform the data before importing it into PostgreSQL. This requires loading data into memory while executing data transformations, which may be problematic for large datasets that are too big to fit in memory. In this case, we may consider an ELT workflow, where we transform the data in the same infrastructure where it’s stored. Since our dataset is small (<10k rows), this shouldn’t be a problem, and we’ll take advantage of the fact that pandas makes data transformation easy.

We’ll extract the CPD log data by making a request for the dataset to the Socrata Open Data API. We’ll use sodapy — a python client for the API — to make the request.

We’ll encapsulate this extraction code in its own file — extract.py.

import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os
from prefect import task

@task(retries=3, retry_delay_seconds=[10, 10, 10]) # retry API request in case of failure
def extract_data():
    '''
    Extract incident data reported to the Cambridge Police Department using the Socrata Open Data API.
    Return the incident data as a Pandas DataFrame.
    '''
    # fetch Socrata app token from .env
    # include this app token when interacting with the Socrata API to avoid request throttling, so we can fetch all the incidents
    load_dotenv()
    APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN") 

    # create Socrata client to interact with the Socrata API (https://github.com/afeld/sodapy)
    client = Socrata(
        "data.cambridgema.gov", 
        APP_TOKEN, 
        timeout=30 # increase timeout from 10s default - sometimes, it takes longer to fetch all the results
    )

    # fetch all data, paginating over results
    DATASET_ID = "3gki-wyrb" # unique identifier for Cambridge Police Log data (https://data.cambridgema.gov/Public-Safety/Daily-Police-Log/3gki-wyrb/about_data)
    results = client.get_all(DATASET_ID)

    # Convert to pandas DataFrame
    results_df = pd.DataFrame.from_records(results)

    return results_df

Notes about the code:

  • Socrata throttles requests if you don’t include an app token that uniquely identifies your application. To fetch all the results, we’ll include this token in our request and put this in a .env file to keep this out of our source code.
  • We’ll specify a 30 second timeout (instead of the 10 second default timeout) when making our request to the Socrata API. From experience using the API, fetching all the results could sometimes take longer than 10 seconds, and 30 seconds was typically enough to avoid timeout errors.
  • We’ll load the fetched results into a pandas DataFrame, since we’ll validate and transform this data using pandas.

ETL: Validate

Now, we’ll do some basic data quality checks on the data.

The data is already fairly clean (which makes sense as it’s provided by the Cambridge Police Department). So, our data quality checks will act more as a “sanity check” that we didn’t ingest anything unexpected.

We’ll validate the following:

  • All the expected columns (as specified here) are present.
  • All IDs are numeric.
  • Datetimes follow ISO 8601 format.
  • There are no missing values in columns that should contain data. Specifically, each incident should have a Datetime, ID, Type, and Location.

We’ll put this validation code in its own file — validate.py.

from datetime import datetime
from collections import Counter
import pandas as pd
from prefect import task

### UTILITIES
def check_valid_schema(df):
    '''
    Check whether the DataFrame content contains the expected columns for the Cambridge Police dataset. 
    Otherwise, raise an error.
    '''
    SCHEMA_COLS = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
    if Counter(df.columns) != Counter(SCHEMA_COLS):
        raise ValueError("Schema does not match with the expected schema.")
    
def check_numeric_id(df):
    '''
    Convert 'id' values to numeric.
    If any 'id' values are non-numeric, replace them with NaN, so they can be removed downstream in the data transformations.
    '''
    df['id'] = pd.to_numeric(df['id'], errors='coerce')
    return df

def verify_datetime(df):
    '''
    Verify 'date_time' values follow ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html).
    Raise a ValueError if any of the 'date_time' values are invalid.
    '''
    df.apply(lambda row: datetime.fromisoformat(row['date_time']), axis=1) 
    
def check_missing_values(df):
    '''
    Check whether there are any missing values in columns that require data.
    For police logs, each incident should have a datetime, ID, incident type, and location.
    '''
    REQUIRED_COLS = ['date_time', 'id', 'type', 'location']
    for col in REQUIRED_COLS:
        if df[col].isnull().sum() > 0:
            raise ValueError(f"Missing values are present in the '{col}' attribute.")

### VALIDATION LOGIC
@task
def validate_data(df):
    '''
    Check the data satisfies the following data quality checks:
    - schema is valid
    - IDs are numeric
    - datetime follows ISO 8601 format
    - no missing values in columns that require data
    '''
    check_valid_schema(df)

    df = check_numeric_id(df)

    verify_datetime(df)
    
    check_missing_values(df)
        
    return df

When implementing these data quality checks, it’s important to think about how to handle data quality checks that fail. 

  • Do we want our pipeline to fail loudly (e.g. raise an error/crash)?
  • Should our pipeline handle failures silently? For instance, mark data identified to be invalid so that it can be removed downstream?

We’ll raise an error if:

  • The ingested data doesn’t follow the expected schema. It doesn’t make sense to process the data if it doesn’t contain what we expect.
  • Datetime doesn’t follow ISO 8601 format. There’s no standard way to convert incorrect datetime values to its corresponding correct datetime format.
  • The incident contains missing values for any one of datetime, ID, type, and location. Without these values, the incident cannot be described comprehensively.

For records that have non-numeric IDs, we’ll fill them with NaN placeholders and then remove them downstream in the transformation step. These records do not break our analysis if we simply remove them. 

ETL: Transform

Now, we’ll do some transformations on our data to prepare it for storage in PostgreSQL.

We’ll do the following transformations:

  • Remove duplicate rows — we’ll use the ‘ID’ column to identify duplicates.
  • Remove invalid rows — some of the rows that failed the data quality checks were marked with an NaN ‘ID’, so we’ll remove these.
  • Split the datetime column into separate year, month, day, and time columns. In our final analysis, we may want to analyze crime trends by these different time intervals, so we’ll create these additional columns here to simplify our queries downstream.

We’ll put this transformation code in its own file — transform.py.

import pandas as pd
from prefect import task

### UTILITIES
def remove_duplicates(df):
    '''
    Remove duplicate rows from dataframe based on 'id' column. Keep the first occurrence.
    '''
    return df.drop_duplicates(subset=["id"], keep='first')

def remove_invalid_rows(df):
    '''
    Remove rows where the 'id' is NaN, as these IDs were identified as non-numeric.
    '''
    return df.dropna(subset='id')

def split_datetime(df):
    '''
    Split the date_time column into separate year, month, day, and time columns.
    '''
    # convert to datetime
    df['date_time'] = pd.to_datetime(df['date_time'])

    # extract year/month/day/time
    df['year'] = df['date_time'].dt.year
    df['month'] = df['date_time'].dt.month
    df['day'] = df['date_time'].dt.day
    df['hour'] = df['date_time'].dt.hour
    df['minute'] = df['date_time'].dt.minute
    df['second'] = df['date_time'].dt.second

    return df

### TRANSFORMATION LOGIC
@task
def transform_data(df):
    '''
    Apply the following transformations to the passed in dataframe:
    - deduplicate records (keep the first)
    - remove invalid rows
    - split datetime into year, month, day, and time columns
    ''' 

    df = remove_duplicates(df)

    df = remove_invalid_rows(df)

    df = split_datetime(df)

    return df

ETL: Load

Now our data is ready to import into into PostgreSQL.

Before we can import our data, we need to create our PostgreSQL instance. We’ll create one locally using a compose file. This file allows us to define & configure all the services that our application needs.

services: 
  postgres_cpd: # postgres instance for CPD ETL pipeline
    image: postgres:16
    container_name: postgres_cpd_dev
    environment:
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: cpd_db
    ports:
      - "5433:5432" # Postgres is already on port 5432 on my local machine
    volumes: 
      - pgdata_cpd:/var/lib/postgresql/data
    restart: unless-stopped
  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin_dev
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
    ports:
      - "8081:80"
    depends_on: # don't start pg_admin until our postgres instance is running
      - postgres_cpd 

volumes:
  pgdata_cpd: # all data for our postgres_cpd service will be stored here

There are two main services defined here:

  • postgres_cpd — This is our PostgreSQL instance where we’ll store our data.
  • pgadmin —DB admin platform which provides a GUI we can use to query data in our PostgreSQL database. Not functionally required, but useful for checking the data in our database. For more information on connecting to your PostgreSQL database in pgAdmin, click here.

Let’s highlight some important configuration for our postgres_cpd service:

  • container_name: postgres_cpd_dev -> Our service will run in a container (i.e. an isolated process) named postgres_cpd_dev. Docker generates random container names if you don’t specify this, so assigning a name will make it more straightforward to interact with the container.
  • environment: -> We create a Postgres user from credentials stored in our .env file. Additionally, we create a default database, cpd_dev.
  • ports: -> Our PostgreSQL service will listen on port 5432 within the container. However, we’ll map port 5433 on the host machine to port 5432 in the container, allowing us to connect to PostgreSQL from our host machine via port 5433.
  • volumes: -> Our service will store all its data (e.g. configuration, data files) under the following directory within the container: /var/lib/postgresql/data. We’ll mount this container directory to a named Docker volume stored on our local machine, pgdata_cpd. This allows us to persist the database data beyond the lifetime of the container.

Now that we’ve created our PostgreSQL instance, we can execute queries against it. Importing our data into PostgreSQL requires executing two queries against the database:

  • Creating the table that will store the data. 
  • Loading our transformed data into that table. 

Each time we execute a query against our PostgreSQL instance, we need to do the following:

  • Establish our connection to PostgreSQL.
  • Execute the query.
  • Commit the changes & close the connection.
from prefect import task
from sqlalchemy import create_engine
import psycopg2
from dotenv import load_dotenv
import os

# read content from .env, which contains our Postgres credentials
load_dotenv()

def create_postgres_table():
    '''
    Create the cpd_incidents table in Postgres DB (cpd_db) if it doesn't exist.
    '''
    # establish connection to DB
    conn = psycopg2.connect(
        host="localhost",
        port="5433",
        database="cpd_db",
        user=os.getenv("POSTGRES_USER"),
        password=os.getenv("POSTGRES_PASSWORD")
    )

    # create cursor object to execute SQL
    cur = conn.cursor()
    
    # execute query to create the table
    create_table_query = '''
        CREATE TABLE IF NOT EXISTS cpd_incidents (
            date_time TIMESTAMP,
            id INTEGER PRIMARY KEY,
            type TEXT,
            subtype TEXT,
            location TEXT,
            description TEXT,
            last_updated TIMESTAMP,
            year INTEGER,
            month INTEGER,
            day INTEGER,
            hour INTEGER, 
            minute INTEGER,
            second INTEGER
        )
    '''
    cur.execute(create_table_query)

    # commit changes
    conn.commit()

    # close cursor and connection
    cur.close()
    conn.close()

@task
def load_into_postgres(df):
    '''
    Loads the transformed data passed in as a DataFrame 
    into the 'cpd_incidents' table in our Postgres instance.
    '''
    # create table to insert data into as necessary
    create_postgres_table()

    # create Engine object to connect to DB
    engine = create_engine(f"postgresql://{os.getenv("POSTGRES_USER")}:{os.getenv("POSTGRES_PASSWORD")}@localhost:5433/cpd_db")
        
    # insert data into Postgres DB into the 'cpd_incidents' table
    df.to_sql('cpd_incidents', engine, if_exists='replace')

Things to note about the code above:

  • Similar to how we fetched our app token for extracting our data, we’ll fetch our Postgres credentials from a .env file.
  • To load the DataFrame containing our transformed data into Postgres, we’ll use the pandas.DataFrame.to_sql(). It’s a simple way to insert DataFrame data into any database supported by SQLAlchemy.

Defining the Data Pipeline

We’ve implemented the individual components of the ETL process. Now, we’re ready to encapsulate these components into a pipeline.

There are many tools available to use for orchestrating pipelines defined in python. Two popular options are Apache Airflow and Prefect.

For it’s simplicity, we’ll proceed with defining our pipeline using Prefect. We need to do the following to get started:

  • Install Prefect in our development environment.
  • Get a Prefect API server. Since we don’t want to manage our own infrastructure to run Prefect, we’ll sign up for for Prefect Cloud.

For more information on Prefect setup, check out the docs

Next, we must add the following decorators to our code:

  • @task -> Add this to each function that implements a component of our ETL pipeline (i.e. our extract, validate, transform, and load functions).
  • @flow -> Add this decorator to the function that encapsulates the ETL components into an executable pipeline.

If you look back at our extract, validate, transform, and load code, you’ll see that we added the @task decorator to these functions.

Now, let’s define our ETL pipeline that executes these tasks. We’ll put the following in a separate file, etl_pipeline.py.

from extract import extract_data
from validate import validate_data
from transform import transform_data
from load import load_into_postgres
from prefect import flow

@flow(name="cpd_incident_etl", log_prints=True) # Our pipeline will appear as 'cpd_incident_etl' in the Prefect UI. All print outputs will be displayed in Prefect.
def etl():
    '''
    Execute the ETL pipeline:
    - Extract CPD incident data from the Socrata API
    - Validate and transform the extracted data to prepare it for storage
    - Import the transformed data into Postgres 
    '''
    print("Extracting data...")
    extracted_df = extract_data()

    print("Performing data quality checks...")
    validated_df = validate_data(extracted_df)

    print("Performing data transformations...")
    transformed_df = transform_data(validated_df)

    print("Importing data into Postgres...")
    load_into_postgres(transformed_df)

    print("ETL complete!")

if __name__ == "__main__":
    # CPD data is expected to be updated daily (https://data.cambridgema.gov/Public-Safety/Daily-Police-Log/3gki-wyrb/about_data)
    # Thus, we'll execute our pipeline on a daily basis (at midnight)
    etl.serve(name="cpd-pipeline-deployment", cron="0 0 * * *")

Things to note about the code:

  • @flow(name=”cpd_incident_etl”, log_prints=True) -> this names our pipeline “cpd_incident_etl”, which will be reflected in the Prefect UI. The output of all our print statements will be logged in Prefect.
  • etl.serve(name=”cpd-pipeline-deployment”, cron=”0 0 * * *”) -> this creates a deployment of our pipeline, named “cpd-pipeline-deployment”, that runs every day at midnight.
Our flow will show up on Prefect’s home page.
The “Deployments” tab in the Prefect UI shows us our deployed flows.

Now that we’ve created our pipeline to load our data into PostgreSQL, it’s time to visualize it. 

There are many approaches we could take to visualize our data. Some notable options include:

Both are good options. Without going into too much detail behind each BI tool, we’ll use Metabase to make a simple dashboard.

  • Metabase is an open-source BI and embedded analytics tool that makes data visualization and analysis simple.
  • Connecting Metabase to our data sources and deploying it is straightforward, compared to other BI tools (ex: Apache Superset).

In the future, if we want to have more customization over our visuals/reports, we can consider using other tools. For now, Metabase will do for creating a POC.

Metabase allows you to choose between using its cloud version or managing a self-hosted instance. Metabase Cloud offeres several payment plans, but you can create a self-hosted instance of Metabase for free using Docker. We’ll define our Metabase instance in our compose file.

  • Since we’re self-hosting, we also have to define the Metabase application database, which contains the metadata that Metabase needs to query your data sources (in our case, postgres_cpd). 
services: 
  postgres_cpd: # postgres instance for CPD ETL pipeline
    image: postgres:16
    container_name: postgres_cpd_dev
    environment:
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: cpd_db
    ports:
      - "5433:5432" # Postgres is already on port 5432 on my local machine
    volumes: 
      - pgdata_cpd:/var/lib/postgresql/data
    restart: unless-stopped
    networks:
      - metanet1
  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin_dev
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
    ports:
      - "8081:80"
    depends_on:
      - postgres_cpd
    networks:
      - metanet1
  metabase: # taken from https://www.metabase.com/docs/latest/installation-and-operation/running-metabase-on-docker
    image: metabase/metabase:latest
    container_name: metabase
    hostname: metabase
    volumes:
      - /dev/urandom:/dev/random:ro
    ports:
      - "3000:3000"
    environment:
      MB_DB_TYPE: postgres
      MB_DB_DBNAME: metabaseappdb
      MB_DB_PORT: 5432 
      MB_DB_USER: ${METABASE_DB_USER}
      MB_DB_PASS: ${METABASE_DB_PASSWORD}
      MB_DB_HOST: postgres_metabase # must match container name of postgres_mb (Metabase Postgres instance)
    networks:
      - metanet1
    healthcheck:
      test: curl --fail -I http://localhost:3000/api/health || exit 1
      interval: 15s
      timeout: 5s
      retries: 5
  postgres_mb: # postgres instance for managing Metabase instance
    image: postgres:16
    container_name: postgres_metabase # other services must use this name to communicate with this container
    hostname: postgres_metabase # internal identifier, doesn't impact communication with other services (helpful for logs)
    environment:
      POSTGRES_USER: ${METABASE_DB_USER}
      POSTGRES_DB: metabaseappdb
      POSTGRES_PASSWORD: ${METABASE_DB_PASSWORD}
    ports:
      - "5434:5432" 
    volumes:
      - pgdata_mb:/var/lib/postgresql/data
    networks:
      - metanet1

# Here, we'll define separate volumes to isolate DB configuration & data files for each Postgres database.
# Our Postgres DB for our application should store its config/data separately from the Postgres DB our Metabase service relies on.
volumes:
  pgdata_cpd:
  pgdata_mb:

# define the network over which all the services will communicate
networks:
  metanet1:
    driver: bridge # TO DO: 'bridge' is the default network - services will be able to communicate with each other using their service names

To create our Metabase instance, we made the following changes to our compose file:

  • Added two services: metabase (our Metabase instance) and postgres_mb (our Metabase instance’s application database). 
  • Defined an additional volume, pgdata_mb. This will store the data for the Metabase application database (postgres_mb). 
  • Defined the network over which the services will communicate, metanet1

Without going into too much detail, let’s break down the metabase and postgres_mb services. 

Our Metabase instance (metabase):

  • This service will be exposed on port 3000 on the host machine and within the container. If we’re running this service on our local machine, we’ll be able to access it at localhost:3000.
  • We connect Metabase to it’s application database by ensuring that the MB_DB_HOST, MB_DB_PORT, and MB_DB_NAME environment variables match up with the container name, ports, and database name listed under the postgres_mb service.

For more information on how to run Metabase in Docker, check out the docs.

After setting up Metabase, you’ll be prompted to connect Metabase to your data source. 

Metabase can connect to a wide variety of data sources.

After selecting a PostgreSQL data source, we can specify the following connection string to connect Metabase to our PostgreSQL instance, substituting your credentials as necessary:

postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@postgres_cpd:5432/cpd_db
Connecting to PostgreSQL in Metabase by specifying your connection string.

After setting up the connection, we can create our dashboard. You can create a wide variety of visuals in Metabase, so we won’t go into the specifics here. 

Let’s revisit the example dashboard that we displayed at the beginning of this post. This dashboard nicely summarizes recent and historical trends in reported CPD incidents.

CPD Incident Trends dashboard created in Metabase.

From this dashboard, we can see the following:

  • Most incidents are reported to the CPD in the mid-late afternoon.
  • An overwhelming majority of reported incidents are of the “INCIDENT” type.
  • The number of reported incidents peaked around August-October of 2025, and has been decreasing steadily ever since.

Luckily for us, Metabase will query our database whenever we load this dashboard, so we won’t have to worry about this dashboard displaying stale data.

Check out the Git repo here if you want to dive deeper into the implementation.


Wrap-up and Future Work

Thanks for reading! Let’s briefly recap what we built:

  • We built a data pipeline to extract, transform, and load Cambridge Police Log data into a self-hosted PostgreSQL database.
  • We deployed this pipeline using Prefect and scheduled it to run daily.
  • We created a self-hosted instance of Metabase, connected it to our PostgreSQL database, and created a dashboard to visualize recent and historical crime trends in Cambridge, MA.

There are many ways to build upon this project, including but not limited to:

  • Creating additional visualizations (geospatial heatmap) to visualize crime frequencies in different areas within Cambridge. This would require transforming our street location data into latitude/longitude coordinates.
  • Deploying our self-hosted pipeline and services off of our local machine.
  • Consider joining this data with other datasets for insightful cross-domain analysis. For instance, perhaps we could join this dataset to demographic/census data (using street location) to see whether areas of different demographic makeup within Cambridge have different incident rates.

If you have any other ideas for how to extend upon this project, or you would’ve built things differently, I’d love to hear it in the comments!

The author has created all images in this article.


Sources & GitHub

Prefect:

Metabase:

Docker:

GitHub Repo:

CPD Daily Police Log Dataset:

Share this Article
Please enter CoinGecko Free Api Key to get this plugin works.