AI Reminds Me Topological Sort Exists
once i had an interview where they described a problem to solve on a whiteboard.
the actual way to solve the problem was “implement topological sort from memory,” but I didn’t know that (sidenote: people who don’t understand the difference between interviewing “academic algorithmists” and “developers” and “programmers” can go to hell), so they just let me fail the problem struggling on a whiteboard for 20 minutes until they said “the actual answer is topological sort you idiot why do you think you deserve to be employed or even exist you are clearly incompetent and should just give up on your life.”
also recently realized i’ve never gotten a job by passing a coding interview. imagine being such a worthless piece of shit who can’t even pass a coding interview? i’m available for parties and corporate events if you need somebody to point and laugh at.
begin gpt-4 continuing to erode any sense of uniqueness or pride we have in our capabilities
system> Hello python algorithm helper! Can you help me write a system to detect paths in control flow then output a tree-based plan resolving all dependencies? I can provide the input format when you are ready.
assistant> Firstly, let’s clarify what we’re dealing with here. Control flow graphs are used in computer science to describe the order in which different parts of a program get executed. Detecting paths in these refers to finding possible ‘routes’ through the program.
Now, I can certainly help you write such a system, but it will be complex, depending on the specifics. However, here’s a simple, illustrative Python ‘sketch’ of what this kind of function could look like.
This function will take in a directed acyclic graph (DAG), represented as an adjacency list (a Python dictionary), and return a list of all paths from the first node to the last node:
def find_paths(graph, start, end, path=[]):
path = path + [start]
if start == end:
return [path]
if start not in graph:
return []
paths = []
for node in graph[start]:
if node not in path:
newpaths = find_paths(graph, node, end, path)
for newpath in newpaths:
paths.append(newpath)
return paths
You could use this function like so:
graph = {'A': ['B', 'C'],
'B': ['D'],
'C': ['D'],
'D': ['E'],
'E': []}
start = 'A'
end = 'E'
paths = find_paths(graph, start, end)
for path in paths:
print(path)
This program will print all the paths from ‘A’ to ‘E’ in the given graph.
To “output a tree-based plan resolving all dependencies” can mean a lot of different things, depending on the specifics of your scenario. But a start could be to use a tree data structure to store the graph and use a depth-first search algorithm to detect all paths.
Again, this is a simple illustration. Real-world problems related to control flow analysis and dependency planning can get very complex, depending on the specifics. Also, this does not account for error handling and edge cases for large inputs and such. You might want to consider all of these factors in your actual implementation.
user> Yes, we basically want to define functions with return values, but the return values can also be input arguments to future functions. We just need to generate a linear order of function calls and also determine which calls can be called concurrently because they have no shared dependencies. The input format is: [dict(name=Name, fun=Function, args=[1, 2, 3]), dict(name=Name2, fun=Function2, args=[Name, 5, 6])] etc. Here, the Name2 call includes the result from Name as an argument to Function2.
assistant> I see, this task involves finding the dependencies between objects and organizing the execution order accordingly.
One approach is to build a Directed Acyclic Graph (DAG) with the nodes as your tasks and edges representing the dependencies between tasks. Once your graph is built, you perform a Topological Sort to find a linear ordering of the tasks such that each task comes before all tasks it depends on.
This Python code demonstrate how this may look:
from collections import deque
def build_graph(tasks):
graph = {name: [] for name in tasks.keys()}
rev_graph = {name: [] for name in tasks.keys()} # reversed graph
for name, task in tasks.items():
for arg in task['args']:
if arg in tasks.keys(): # it's a dependency
graph[arg].append(name)
rev_graph[name].append(arg)
return graph, rev_graph
def detect_cycles(graph, tasks):
color = {name: 0 for name in tasks.keys()} # 0=white, 1=gray, 2=black
for name in tasks.keys():
if color[name] == 0:
cycle = dfs_visit(graph, color, name)
if cycle:
return True
return False
def dfs_visit(graph, color, name):
color[name] = 1
for neighbour in graph[name]:
if color[neighbour] == 1:
return True
elif color[neighbour] == 0:
if dfs_visit(graph, color, neighbour):
return True
color[name] = 2
return False
def topological_sort(tasks, graph):
in_degree = {u: 0 for u in tasks.keys()}
for u in graph:
for v in graph[u]:
in_degree[v] += 1
q = deque()
for u in in_degree:
if in_degree[u] == 0:
q.appendleft(u)
res = []
while q:
u = q.pop()
res.append(u)
for v in graph[u]:
in_degree[v] -= 1
if in_degree[v] == 0:
q.appendleft(v)
return res if len(res) == len(tasks) else None
# example input:
tasks = {
'Name1': {'fun': 'Function1', 'args': ['1', '2', '3']},
'Name2': {'fun': 'Function2', 'args': ['Name1', '5', '6']},
'Name3': {'fun': 'Function3', 'args': ['2', 'Name2', 'Name1']},
'Name4': {'fun': 'Function4', 'args': ['Name3']}
}
graph, rev_graph = build_graph(tasks)
if detect_cycles(graph, tasks):
print("Error: The tasks have a circular dependency.")
else:
order = topological_sort(tasks, graph)
print("The order of tasks is: " + " -> ".join(order))
This code first checks for cycles in your call graph. Circular dependencies among tasks cannot be resolved, so it’s important to check for them. If there are no cycles, it means our graph is a DAG, and we can find a topological ordering. In this task ordering, each task comes before any other task which depends on it.
Please note that this code assumes that the args
field of each task is a list containing either names of other tasks or standalone arguments. It also assumes that standalone arguments do not coincide with the keys used as task names.
Thus, you can use this code as a guide to build your own system for managing function calls and dependencies.
(mattnote: interestingly (or scarily), this code works perfectly the first time.
also, as always, it generated a viable example test case data structure with good organization by itself)
user> That’s a pretty good answer, but how can we detect when functions can be called in parallel because they share no dependencies?
assistant> If tasks have no dependencies, they can be run in parallel. From the topological sort, tasks at the same level of the sorted graph have no dependencies on each other and thus can be run in parallel.
Let’s modify the topological sort function to find these levels:
def parallelizable_tasks(tasks, graph):
in_degree = {u: 0 for u in tasks.keys()}
for u in graph:
for v in graph[u]:
in_degree[v] += 1
q = deque()
for u in in_degree:
if in_degree[u] == 0:
q.appendleft(u)
levels = {}
level = 0
while q:
next_level = deque()
while q:
u = q.pop()
for v in graph[u]:
in_degree[v] -= 1
if in_degree[v] == 0:
next_level.appendleft(v)
if u not in levels:
levels[u] = level
level += 1
q = next_level
parallel_tasks = {}
for u, lv in levels.items():
if lv not in parallel_tasks:
parallel_tasks[lv] = []
parallel_tasks[lv].append(u)
return parallel_tasks
parallel_tasks = parallelizable_tasks(tasks, graph)
for lv in sorted(parallel_tasks.keys()):
print(f"Level {lv}: " + ", ".join(parallel_tasks[lv]))
This function is an extended version of the topological sort. It uses the same logic, but tasks that are pushed onto the queue within the same loop have the same level and can be run concurrently, and are thus put under the same level in the output.
At the end, parallel_tasks will contain a mapping from levels (integer) to list of tasks that can be run at the same time. Tasks at lower levels must be executed before tasks at higher levels.
Note that this only provides one of potentially many valid orderings, and actual execution order can vary depending on factors like system load, task length variance and other.
Please note: Parallel execution of tasks can be more efficiently achieved with Python’s concurrent.futures or multiprocessing module. However, the process/thread creation and management is outside the scope of algorithm and more relates to system architecture and setup.
(mattnote: this also works perfectly)
user> How can we detect parallelizable tasks using the Python built-in API for graphlib.TopologicalSorter?
assistant> The graphlib
module introduced in Python 3.9 provides a TopologicalSorter
class that allows determining topological order for a graph that represents tasks dependencies. To detect parallelizable tasks, we can use the TopologicalSorter
and determine tasks at the same “level” which can be run concurrently.
Here is an example how one might achieve this:
from graphlib import TopologicalSorter
def add_tasks_to_sorter(tasks, sorter):
for name, task in tasks.items():
dependencies = [arg for arg in task['args'] if arg in tasks.keys()]
sorter.add(name, *dependencies)
def find_parallelizable_tasks(tasks):
sorter = TopologicalSorter()
add_tasks_to_sorter(tasks, sorter)
parallelizable_tasks = []
while sorter.is_active():
parallelizable_tasks.append(sorter.get_ready())
return parallelizable_tasks
# example input:
tasks = {
'Name1': {'fun': 'Function1', 'args': ['1', '2', '3']},
'Name2': {'fun': 'Function2', 'args': ['Name1', '5', '6']},
'Name3': {'fun': 'Function3', 'args': ['2', 'Name2', 'Name1']},
'Name4': {'fun': 'Function4', 'args': ['Name3']}
}
parallelizable_groups = find_parallelizable_tasks(tasks)
for i, group in enumerate(parallelizable_groups):
print(f"Group {i+1}: {', '.join(group)}")
This will print groups of tasks that can be run in parallel. Each group represents a set of tasks that don’t depend on each other and can be executed concurrently. Tasks in the next group can only be started once all tasks in the previous group have finished, because they depend on the results of the previous tasks.
graphlib.TopologicalSorter.get_ready()
returns a list of all the nodes that have no more unprocessed predecessors which is exactly what we need to find tasks ready to be run in parallel. We use graphlib.TopologicalSorter.is_active()
in the loop condition to check if there are any nodes left in the graph.
(mattnote: this does not work perfectly because it forgot the python API requires sorter.done(*nodes)
at the end of each loop, which is an interesting artifact: gpt-4 wrote perfect code when it wrote the entire algo itself, but when having to use external APIs, it isn’t trained strongly enough on the examples and documentation to provide a working implementation the first time.
also it’s a weird/bad style doing arg in tasks.keys()
when it’s the same as arg in tasks
this also reveals the model has no problem understanding actual concepts since it created a working solution itself, but it just doesn’t have enough recall. also, how many of us find it easier to write something ourselves we actually understand versus trying to learn/remember random external APIs with edge cases or extra functionality buried in documentation elsewhere? this run would have been perfect if the model also had live realtime documentation for lookaside referencing.)
user> What is an easy way to track the final call patterns? Here, we need to track basically Function3(2, Function2(Function1(1, 2, 3)), Function1(1, 2, 3))))
assistant> To track the order of function calls and their arguments, you can modify the find_parallelizable_tasks
function like this:
def execute_task(name, tasks, results):
task = tasks[name]
# Replace argument names with their results
args = [results[arg] if arg in results else arg for arg in task['args']]
# Execute the function with its arguments
result = task['fun'](*args)
return result
def execute_tasks(tasks):
sorter = TopologicalSorter()
add_tasks_to_sorter(tasks, sorter)
results = {}
call_patterns = {}
while sorter.is_active():
for task in sorter.get_ready():
results[task] = execute_task(task, tasks, results)
call_patterns[task] = f"{tasks[task]['fun']}({', '.join(map(str, tasks[task]['args']))})"
sorter.done(*sorter.get_ready())
return results, call_patterns
def replace_remaining_calls(call_patterns):
for name, call in call_patterns.items():
for arg in call_patterns.keys():
if arg in call:
call = call.replace(arg, call_patterns[arg])
call_patterns[name] = call
return call_patterns
Here, the execute_tasks
function not only determines the next tasks to execute, but also executes them and stores the results in the results
dictionary. For tracking the call pattern, it creates a string representation of each function call and stores it in the call_patterns
dictionary.
After all tasks have been executed, the replace_remaining_calls
function replaces the remaining task names in the function calls with their respective function calls.
Note, however, that the actual execution of the functions is simulated in this example as it is not clear from your question how your function names in the string format can be mapped to actual Python functions and how they should be executed. You may have to adapt the execute_task
function to your particular case.
Example for task execution:
tasks = {
'Name1': {'fun': 'Function1', 'args': ['1', '2', '3']},
'Name2': {'fun': 'Function2', 'args': ['Name1', '5', '6']},
'Name3': {'fun': 'Function3', 'args': ['2', 'Name2', 'Name1']},
'Name4': {'fun': 'Function4', 'args': ['Name3']}
}
results, call_patterns = execute_tasks(tasks)
call_patterns = replace_remaining_calls(call_patterns)
for name, call in call_patterns.items():
print(f"{name}: {call}")
This will print the final call patterns in provided task format. However, you’ll need to replace the function names with actual python functions to execute them.
(mattnote: here, gpt-4 did use the sorter.done()
API, but it put the call outside of the loop and also double-called .get_ready()
which is wrong and doesn’t let the code run. the proper answer is to just have sorter.done(task)
inside the loop for each task
.
this basically solved the entire problem i wanted to try)
Example output (modified to print the fake fn dependencies):