Data wrangling is perhaps the job that occupies the most time from Data Scientists. Data wrangling includes cleaning, transforming, and generally manipulating data from its raw state into something useful. Like many activities, the wrangling process often needs to be refined over time. So it is important to keep track of how a dataset is wrangled so that your team can manage and reproduce the process over time. Data wrangling, although not always fun, may be the most important activity at any modern company.
There are some companies that specialize in data pipelines, and they can be complex and very sophisticated. But for this exploration, let’s consider the task of turning a text file into a set of words or “tokens”, throwing out texts that are not useful to us. Let’s start simple, and work our way up.
Initially, lets define a series of steps to perform wrangling functions on words in a text. We’ll use the Python text.translate() function to do some of the work for us. Consider these 4 functions:
import stringdef step1(word):
trans = str.maketrans("", "", string.punctuation)
return word.replace("\n", " ").translate(trans)
def step2(word):
return word.lower()
def step3(word):
trans = str.maketrans("", "", "0123456789")
return word.replace("\n", " ").translate(trans)
def step4(word):
return (all([char in string.ascii_letters for char in word]) and
len(word) > 0)
step1 is a function that removes all of the punctuation from a word, and removes newlines. step2 turns a word into lowercase. step3 again uses text.translate() to remove digits. And step4 will be used as a filter to filter out words that contain non-ASCII letters. You can imagine additional steps, such as stemming.
As these are simple functions, if we apply step1 to a word, we’ll get:
>>> step1("Testing---123;")
'Testing123'
Indeed, it has removed the punctuation from the text. We can apply all three functions by wrapping them Russian-doll style around the word:
>>> step3(step2(step1("Testing---123;")))
'testing'
Here we see that functions step1, step2, and step3 have been applied leaving only the letters “testing”. Note that we will define our functions to work in a particular order. That is, step1 should be done before step2, etc.
This function-based process is simple to create and simple to use. Of course, we could do all of the functions at once. But as the “pipeline” of functions get longer and more complex, breaking up the process into discrete steps will make the process more manageable. In fact, each step might become so complex as to have different teams working on them.
Okay, so far, so good. But of course, we don’t want to manually apply the function pipeline onto each word. Instead we want to apply it to every word in a list. To do this, we make a very simple function apply():
def apply(step, values):
return [step(value) for value in values]
Now we can use the same functions on entire lists of words:
>>> apply(step3,
apply(step2,
apply(step1,
["Testing---123;", "456---", "Hello!"])))
['testing', '', 'hello']
Ah, yes, we need to remove empty words. step4 designed exactly for that, but is a bit more complex to use. It would look like:
>>> list(filter(step4,
apply(step3,
apply(step2,
apply(step1,
["Testing---123;", "456---", "Hello!"])))))
['testing', 'hello']
That is, because step4 is a filter function returning True to keep it and False to remove it, it is applied like so: filter(step4, data).
There are a few problems with this simple approach:
- The steps are applied from the inside out. That is, the first step, step1, is the innermost function, while step3 is on the outside. Not very intuitive.
- It is very wordy in that we have to repeat the apply() function for each step function.
- Filters (like step4) can’t be used like the other functions.
With these issues in mind, can we abstract the main functionality into a generalized pipeline? I’m imagining a two-step approach:
# First we create a pipeline function:
p = my_pipeline(step1, step2, step3)# And then we apply it to a dataset:
p(["Testing---123;", "456---", "Hello!"])
How could we define my_pipeline? It turns out to be fairly straightforward:
def my_pipeline(*steps):
def wrapper(inputs):
for step in steps:
inputs = apply(step, inputs)
return inputs
return wrapper
That is, my_pipeline is a function that takes a series of step functions, and returns a function that takes a list of words, applies each step in the series, and returns the processed list of words.
Let’s try it out:
>>> p = my_pipeline(step1, step2, step3)
>>> p(["Testing---123;", "456---", "Hello!"])
['testing', '', 'hello']
It works — we got exactly what we got before! What about the step4 filter function? Let’s leave that for the moment and try out this system on “real” data. Well, it will be real fake data. For these experiments, we’ll create 10,000 documents, each consisting of 10 paragraphs. We’ll use the DocumentGenerator() from the Python package essential_generators.
from essential_generators import DocumentGenerator
import osgen = DocumentGenerator()
def generate_documents(
count=10_000,
paragraphs=10,
output_folder="documents",
overwrite=False
):
os.makedirs(output_folder, exist_ok=True)
for n in range(count):
filename = os.path.join(
output_folder,
"doc_%05d.txt" % (n + 1)
)
if overwrite or not os.path.exists(filename):
with open(filename, "w") as fp:
for p in range(paragraphs):
fp.write(gen.paragraph() + "\n\n")
generate_documents()
This will take about 30 seconds to generate all of the data. To continue with our simple code, we need to introduce one more step:
def step0(filename):
return open(filename).read().split(" ")
This step will take a filename, open the file, and split the text on spaces. And we need to make a slight adjustment to our apply() function to handle lists of words, instead of words:
def apply(step, outputs):
return (step(input) if not isinstance(input, list) else
[step(i) for i in input] for input in outputs)
I also made one other slight adjustment to apply: it now returns a generator expression rather than a list comprehension by using the surrounding parentheses rather than square brackets. This will delay processing until needed (sometimes called “lazy evaluation”).
Now we can build a near-complete pipeline system:
p = my_pipeline(step0, step1, step2, step3)
list(p(["documents/doc_00001.txt"]))
Note that it takes a list of filenames as input. Nice and simple. But there are a few things that I’d still like to see:
- ability to handle filters in a simple way
- ability to run the pipeline in parallel to process datasets quickly
- ability to visualize the pipeline
For these three additions, I’ll refer you to the picopipe project that I developed based on the ideas above. You can pip install it:
pip install picopipe
and run it with the same step functions from above:
from picopipe import pipeline, pfilterp = pipeline(step0, step1, step2, step3, pfilter(step4))
list(p(["documents/doc_00001.txt"])[0])
Here, pfilter stands for pipeline-filter, and you simply wrap it around the step4 function. I’m pretty happy with the design. But let’s see how fast it will run.
First, let’s get all of the document filenames. An easy way to do that is to use glob:
import globdataset = glob.glob("documents/doc_*.txt")
And now we can process all of the documents:
results = list(p(dataset))
That takes about 21 seconds on my laptop to process all 10,000 documents. Short and sweet! Can we make that run faster?
Yes! There is now also an n_jobs parameter to the pipe that indicates the number of jobs you can run in parallel. Here is a little bit of code that will process the dataset multiple times using 0 to 9 threads. How much faster do you think it will it run using 9 threads in parallel?
import timex = []
y = []
for i in range(10):
start = time.time()
results = list(p(dataset, n_jobs=i))
total_time = time.time() - start
x.append(i)
y.append(total_time)
That will take a couple of minutes. Plotting the result time versus threads shows:
Interesting: the chart levels off rather than continuing to decrease with additional threads. That is, using 9 threads is not 9 times faster than using 1 thread. Why not? Unfortunately, you can’t break the law. And there is a law: Amdahl’s Law. It basically says that you’ll never get N times faster because there is an overhead cost that can’t be reduced. In this case, I can reduce the time from about 21 seconds down to 8 seconds using 4 threads. Still, not bad!
Finally, I’d like to visualize the pipeline. For this part of the project I chose to try out the Mermaid Diagram format. It has gained a lot of support lately, including in github’s repositories. The format is very simple, and easy to create. For github rendering, simply name the file with a .mmd extension. Here is how to generate a mermaid script using picopipe:
from picopipe import to_mermaidwith open("pipeline.mmd", "w") as fp:
fp.write(to_mermaid(p))
And here is is shown in github’s rendering:
Unfortunately, github doesn’t show the mouseover functionality (defined in CSS). However, if you can set your own CSS, then it works to not only visualize the pipeline, but can show the step code when you mouseover a step box:
The above Mermaid chart with mousover support was created using Comet’s custom panel system (free for all users). It was very easy to create a custom panel that displays Mermaid files. Here is a demo of the above Mermaid chart rendered live: comet.com/dsblank/picopipe/a4c044c1657b464087ec44f67ae22709
That completes our exploration of developing the World’s Smallest Data Pipeline Framework, and exploring its parallelization and visualization. You can find all of the code here: github.com/dsblank/picopipe I hope you found the ideas presented here and final module useful.
Interested in Artificial Intelligence, Machine Learning, or Data Science? Consider a clap and a follow. Doug is Head of Research at comet.com, an ML experiment-tracking and model-monitoring company.