AI Programming 16 & 17 & 18
Programming 16
system> Hello Python 3.11 programming assistant and command line application creation helper! Can you help me make an application?
assistant> Of course! What would you like the application to do?
user> I need a system to record my daily tasks filed by timestamp, project, repository being worked on, target goal, goal progress. The system should be able to create new tasks, edit/update current tasks, and generate a daily report of which tasks were worked on. Tasks should also have an option to link to an external GitHub issue id. The system should store data persistently and all management should be via command line operations with the fire.Fire() library called on python dataclasses with full type annotations.
assistant> Great! That sounds like a useful task management system. Let’s break it down into smaller pieces to make it easier to implement.
Firstly, we need to define a data structure to represent the tasks. Data classes are perfect for this use case. Can you give me an idea of what fields you want to store for each task?
user> As described previously: timestamp, project, repository being worked on, target goal, goal progress, and any other fields you think are useful for tracking daily progress for a report themed around “What i did today” for others to share.
assistant> Here’s a data class that represents a task with the fields you mentioned:
from dataclasses import dataclass
from typing import Optional
@dataclass
class Task:
timestamp: str
project: str
repository: str
target_goal: str
goal_progress: str
issue_id: Optional[str] = None
additional_notes: Optional[str] = None
This data class has fields for the timestamp, project, repository, target goal, goal progress, and an optional external issue ID and additional notes fields. The timestamp
field can be a string that represents the time the task was created, or you can use the datetime
module to convert it to a Python datetime
object.
We will be using fire.Fire()
library to create tasks and edit tasks via command line. To start with, I suggest creating a TaskManager
class that has methods for creating new tasks, retrieving tasks, and updating tasks. Does that sound reasonable?
user> Yes, please implement the entire system.
assistant> Sure, here’s the fully functional code for the task management system:
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import json
import os
from typing import List
import fire
@dataclass
class Task:
timestamp: str
project: str
repository: str
target_goal: str
goal_progress: str
issue_id: Optional[str] = None
additional_notes: Optional[str] = None
class TaskManager:
def __init__(self, task_file_name='./tasks.json'):
self.task_file_name = task_file_name
def add_task(self, project: str, repo: str, target_goal: str, goal_progress: str, issue_id: Optional[str] = None, additional_notes: Optional[str] = None):
timestamp = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
new_task = Task(timestamp=timestamp, project=project, repository=repo, target_goal=target_goal, goal_progress=goal_progress, issue_id=issue_id, additional_notes=additional_notes)
existing_tasks = self._get_existing_tasks()
existing_tasks.append(new_task)
self._save_tasks(existing_tasks)
print(f"New task added with timestamp {timestamp}")
def update_task(self, index: int, project: Optional[str] = None, repo: Optional[str] = None, target_goal: Optional[str] = None, goal_progress: Optional[str] = None, issue_id: Optional[str] = None, additional_notes: Optional[str] = None):
existing_tasks = self._get_existing_tasks()
try:
existing_task = existing_tasks[index]
except IndexError:
print(f"No task found with index {index}")
return
updated_task = Task(timestamp=existing_task.timestamp, project=project or existing_task.project, repository=repo or existing_task.repository, target_goal=target_goal or existing_task.target_goal, goal_progress=goal_progress or existing_task.goal_progress,
issue_id=issue_id or existing_task.issue_id, additional_notes=additional_notes or existing_task.additional_notes)
existing_tasks[index] = updated_task
self._save_tasks(existing_tasks)
print(f"Task with index {index} updated.")
def delete_task(self, index: int):
existing_tasks = self._get_existing_tasks()
try:
existing_task = existing_tasks[index]
except IndexError:
print(f"No task found with index {index}")
return
del existing_tasks[index]
self._save_tasks(existing_tasks)
print(f"Task {existing_task} removed.")
def list_tasks(self):
existing_tasks = self._get_existing_tasks()
for index, task in enumerate(existing_tasks):
print(f"[{index}] {task}")
def _get_existing_tasks(self) -> List[Task]:
if not os.path.isfile(self.task_file_name):
with open(self.task_file_name, 'w') as f:
json.dump([], f)
with open(self.task_file_name, 'r') as f:
# Avoiding `TypeError: list indices must be integers or slices, not str`
# As we are loading file with [] it is list, no need to index the list :)
data = json.load(f)
return [Task(**task) for task in data]
def _save_tasks(self, tasks: List[Task]):
with open(self.task_file_name, 'w') as f:
json.dump([task.__dict__ for task in tasks], f, indent=4)
if __name__ == '__main__':
fire.Fire(TaskManager)
With this code, you can create a new task by calling
To update an existing task, call
To delete a task, call
To list all tasks, call
All tasks are stored in a JSON file called tasks.json
in the current directory. You can change the file name or path by passing it as an argument when creating a TaskManager
object.
(mattnote: I was impressed how it made efficient use of fire.Fire()
by knowing fire
turns instance methods into command line options and still documenting it fully.
this implementation made some “creative” choices like using the JSON serialization order as the element IDs for deletion. It’s just deleting by “array id” and not any id in the data model itself (though, i guess we didn’t request any primary-key style ID in this data model so it just did the best it could).
also it decided, upon reading, if the file doesn’t exist, to create an empty file, then read the empty file back? I guess it’s consistent, but not what I would have thought to do.
this also basically means software development will be frozen in 2-year chunks if these models can’t update in real time since they won’t be able to use any modern development practices. we already see problems with this model always suggesting outdated Python 3.8 or lower syntax by default when newer 3.9 to 3.11+ syntax is much cleaner for typing and other additional language improvements.)
user> Instead of JSON can you use a SQLite database for all operations?
assistant> Sure! Here’s the updated code using a SQLite database to store the tasks:
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import sqlite3
from typing import List
import fire
@dataclass
class Task:
id: Optional[int] = None
timestamp: str
project: str
repository: str
target_goal: str
goal_progress: str
issue_id: Optional[str] = None
additional_notes: Optional[str] = None
class TaskManager:
def __init__(self, db_file_name='tasks.db'):
self.db_file_name = db_file_name
self._initialize_database()
def _initialize_database(self):
with sqlite3.connect(self.db_file_name) as conn:
cursor = conn.cursor()
cursor.execute("""CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY,
timestamp TEXT NOT NULL,
project TEXT NOT NULL,
repository TEXT NOT NULL,
target_goal TEXT NOT NULL,
goal_progress TEXT NOT NULL,
issue_id TEXT,
additional_notes TEXT
)""")
conn.commit()
def add_task(self, project: str, repo: str, target_goal: str, goal_progress: str, issue_id: Optional[str] = None, additional_notes: Optional[str] = None):
timestamp = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
task = Task(timestamp=timestamp, project=project, repository=repo, target_goal=target_goal, goal_progress=goal_progress, issue_id=issue_id, additional_notes=additional_notes)
with sqlite3.connect(self.db_file_name) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO tasks (timestamp, project, repository, target_goal, goal_progress, issue_id, additional_notes)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (task.timestamp, task.project, task.repository, task.target_goal, task.goal_progress, task.issue_id, task.additional_notes))
task_id = cursor.lastrowid
task.id = task_id
conn.commit()
print(f"New task added with ID {task_id}.")
def update_task(self, task_id: int, project: Optional[str] = None, repo: Optional[str] = None, target_goal: Optional[str] = None, goal_progress: Optional[str] = None, issue_id: Optional[str] = None, additional_notes: Optional[str] = None):
with sqlite3.connect(self.db_file_name) as conn:
cursor = conn.cursor()
cursor.execute("""SELECT * FROM tasks WHERE id = ?""", (task_id,))
result = cursor.fetchone()
if result is None:
print(f"No task found with ID {task_id}")
return
db_task = Task(*result)
updated_task = Task(db_task.id, db_task.timestamp, project or db_task.project, repo or db_task.repository, target_goal or db_task.target_goal, goal_progress or db_task.goal_progress, issue_id or db_task.issue_id, additional_notes or db_task.additional_notes)
cursor.execute("""UPDATE tasks SET timestamp=?, project=?, repository=?, target_goal=?, goal_progress=?, issue_id=?, additional_notes=? WHERE id=?""",
(updated_task.timestamp, updated_task.project, updated_task.repository, updated_task.target_goal, updated_task.goal_progress, updated_task.issue_id, updated_task.additional_notes, updated_task.id))
conn.commit()
print(f"Task with ID {task_id} updated.")
def delete_task(self, task_id: int):
with sqlite3.connect(self.db_file_name) as conn:
cursor = conn.cursor()
cursor.execute("""SELECT * FROM tasks WHERE id = ?""", (task_id,))
result = cursor.fetchone()
if result is None:
print(f"No task found with ID {task_id}")
return
db_task = Task(*result)
cursor.execute("""DELETE FROM tasks WHERE id = ?""", (task_id,))
conn.commit()
print(f"Task {db_task} removed.")
def list_tasks(self):
with sqlite3.connect(self.db_file_name) as conn:
cursor = conn.cursor()
cursor.execute("""SELECT * FROM tasks""")
results = cursor.fetchall()
if not results:
print("No tasks found.")
else:
for task in results:
print(Task(*task))
if __name__ == '__main__':
fire.Fire(TaskManager)
With this code, you can create a new task by calling
To update an existing task, call
To delete a task, call
To list all tasks, call
All tasks are stored in a SQLite database file called tasks.db
in the current directory. You can change the file name or path by passing it as an argument when creating a TaskManager
object.
Programming 17
system> Hello Python 3.11 programming assistant and command line application creation helper! Can you help me make some applications for sharing my browser sessions?
assistant> Sure, I can help you with that! What kind of browser sessions do you want to share? And do you have any specific requirements in mind for the applications you want to create?
user> I’m looking for two components. The first component is a firefox extension to capture every URL I visit then send the URL to a websocket service. The extension should also have the ability to “subscribe” to another user on the web socket service to share a remote browsing session. The second component is the websocket receiver written in python also with the ability to receive navigation details from the firefox extension and also allow other firefox extensions to subscribe to navigation push notifications.
assistant> Great! Here’s an outline of how you could approach building these two components:
- Firefox extension:
- Create a new Firefox extension with a
background
script and acontent_scripts
script. These scripts will communicate with each other to capture and send URL data to the websocket service. - In the
content_scripts
script, use thewindow.location
API to capture the URL of every page the user visits. You could use asetInterval()
function to poll for URL changes every few seconds. - Send the captured URL to the
background
script using thebrowser.runtime.sendMessage()
function. Include a message type to identify that this message is a URL update. - In the
background
script, use thebrowser.runtime.onMessage
function to listen for incoming messages from thecontent_scripts
script. When a message is received, send the URL data to the websocket service using theWebSocket()
API. - To enable remote browsing sessions, allow the user to enter the WebSocket URL of another user’s websocket service. When the user connects to this URL, send a WebSocket message that subscribes the current browser session to the remote browser session. Whenever the remote browser navigates to a new URL, send that URL data to the current browser session using the
WebSocket()
API.
- Create a new Firefox extension with a
- Python websocket receiver:
- Install the
websockets
package using pip to use WebSockets in Python. - Set up a WebSocket server using the
websockets
package. Use aasync with websockets.serve()
function to listen for incoming WebSocket connections on a specified IP address and port. - In the server’s
async def websocket_handler()
function, receive incoming WebSocket messages from connected Firefox extensions. Use these messages to update a shared navigation history or broadcast navigation updates to other subscribed Firefox extensions. - To handle multiple subscribed Firefox extensions, use a dictionary or list to keep track of connected clients and their WebSocket connections.
- Install the
I hope this helps! Let me know if you have any questions or need further assistance.
user> Yes, can we implement both of those now?
assistant> Sure! Let’s start with the Firefox extension first. Here are the steps to create the extension:
- Create a new directory for the extension called “url-tracker”.
- In this directory, create a file called “manifest.json” with the following contents:
{
"manifest_version": 2,
"name": "URL Tracker",
"version": "1.0",
"background": {
"scripts": ["background.js"]
},
"content_scripts": [
{
"matches": ["<all_urls>"],
"js": ["content.js"]
}
],
"permissions": [
"webRequest",
"webRequestBlocking",
"tabs",
"notifications",
"storage",
"activeTab",
"http://*/",
"https://*/",
"tcpSocket",
"webSocket",
"<all_urls>"
]
}
- In the “url-tracker” directory, create a file called “background.js” with the following contents:
let socket;
// Connect to websocket server
function connectWebSocket(url) {
socket = new WebSocket(url);
socket.onopen = function(event) {
console.log("websocket connection opened");
};
socket.onmessage = function(event) {
console.log("received websocket message:", event.data);
};
socket.onerror = function(event) {
console.error("websocket error:", event);
};
socket.onclose = function(event) {
console.log("websocket connection closed:", event.code, event.reason);
};
}
// Disconnect from websocket server
function disconnectWebSocket() {
if (socket && socket.readyState !== WebSocket.CLOSED) {
socket.close();
}
}
// Subscribe to a remote session
function subscribeWebSocket(url) {
if (socket && socket.readyState === WebSocket.OPEN) {
const message = {
type: "subscribe",
url: url
};
socket.send(JSON.stringify(message));
}
}
// Listen for messages from content script and send to websocket server
browser.runtime.onMessage.addListener(function(message, sender, sendResponse) {
if (message.type === "url-update") {
const data = {
type: "navigation",
url: message.url,
tabId: sender.tab.id,
windowId: sender.tab.windowId
};
socket.send(JSON.stringify(data));
}
});
- In the “url-tracker” directory, create a file called “content.js” with the following contents:
let lastUrl = null;
// Send message to background script with current URL
function updateUrl() {
const currentUrl = window.location.href;
if (currentUrl !== lastUrl) {
lastUrl = currentUrl;
const message = {
type: "url-update",
url: currentUrl
};
browser.runtime.sendMessage(message);
}
}
// Update URL every 3 seconds
setInterval(updateUrl, 3000);
- Zip the “url-tracker” directory into a file called “url-tracker.zip”.
- Install the extension by going to the “about:debugging” page in Firefox, clicking “This Firefox” on the left sidebar, clicking “Load Temporary Add-on”, and selecting the “url-tracker.zip” file.
Now the extension is installed and will send URL information to a WebSocket server. You still need to create the Python WebSocket server to receive this information. Here’s how to do it:
- Install the
websockets
package using pip:
- In a new directory called “websocket-server”, create a file called “server.py” with the following contents:
import asyncio
import json
import websockets
clients = {}
async def register(websocket, path):
clients[websocket] = {"subscriptions": []}
try:
async for message in websocket:
data = json.loads(message)
if data["type"] == "subscribe":
url = data["url"]
for client, info in clients.items():
if url in info["subscriptions"]:
client.send(json.dumps({"type": "duplicate"}))
elif client == websocket:
info["subscriptions"].append(url)
client.send(json.dumps({"type": "subscribed"}))
else:
client.send(json.dumps({
"type": "subscription",
"url": url
}))
elif data["type"] == "navigation":
for client, info in clients.items():
if (data["tabId"], data["windowId"]) in info["subscriptions"]:
client.send(message)
except websockets.exceptions.ConnectionClosed:
pass
finally:
del clients[websocket]
async def main():
async with websockets.serve(register, "localhost", 8765):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())
- Start the WebSocket server by running “python server.py” in the “websocket-server” directory.
Now the WebSocket server is running and ready to receive data from the Firefox extension. Let me know if you have any questions or run into any issues!
user> Can we add the ability for the extension and the websocket system to have a login feature so people can publish private navigation and subscribe to one or more users directly for sharing navigation?
assistant> Certainly! Here are some steps to add a login feature to the system:
- Modify the
manifest.json
file to include acontent_security_policy
property that allows the extension to make cross-origin requests to a login API:
{
"manifest_version": 2,
"name": "URL Tracker",
"version": "1.0",
"background": {
"scripts": ["background.js"]
},
"content_scripts": [
{
"matches": ["<all_urls>"],
"js": ["content.js"]
}
],
"permissions": [
"webRequest",
"webRequestBlocking",
"tabs",
"notifications",
"storage",
"activeTab",
"http://*/",
"https://*/",
"tcpSocket",
"webSocket",
"<all_urls>",
"identity"
],
"oauth2": {
"client_id": "your-client-id-here",
"scopes": ["openid", "email"],
"authorization_endpoint": "https://your-auth-server.com/auth",
"token_endpoint": "https://your-auth-server.com/token"
},
"content_security_policy": "script-src 'self' 'unsafe-eval' https://your-auth-server.com; connect-src 'self' ws://localhost:8765 https://your-ws-server.com"
}
- In the
background.js
file, add a function to handle user authentication using theidentity
API:
async function authenticate() {
try {
const userinfo = await browser.identity.getProfileUserInfo();
return { loggedin: true, email: userinfo.email };
} catch (error) {
return { loggedin: false };
}
}
- Modify the
connectWebSocket
function to authenticate the user before connecting to the WebSocket server:
async function connectWebSocket(url) {
const userinfo = await authenticate();
if (userinfo.loggedin) {
socket = new WebSocket(url);
socket.onopen = function(event) {
console.log("websocket connection opened");
const message = {
type: "login",
email: userinfo.email
};
socket.send(JSON.stringify(message));
};
socket.onmessage = function(event) {
console.log("received websocket message:", event.data);
};
socket.onerror = function(event) {
console.error("websocket error:", event);
};
socket.onclose = function(event) {
console.log("websocket connection closed:", event.code, event.reason);
};
} else {
console.error("user not logged in");
}
}
- Modify the
subscribeWebSocket
function to check if the user is logged in before sending the subscription message:
async function subscribeWebSocket(url) {
const userinfo = await authenticate();
if (userinfo.loggedin) {
if (socket && socket.readyState === WebSocket.OPEN) {
const message = {
type: "subscribe",
url: url
};
socket.send(JSON.stringify(message));
}
} else {
console.error("user not logged in");
}
}
- In the
server.py
file, modify theregister
function to handle user authentication and limit access to certain API calls:
async def register(websocket, path):
try:
async for message in websocket:
data = json.loads(message)
if "email" in data:
email = data["email"]
if email in users:
clients[websocket] = {"subscriptions": []}
clients[websocket]["user"] = email
websocket.send(json.dumps({"type": "loggedin"}))
else:
websocket.send(json.dumps({"type": "invalid"}))
elif "type" in data and data["type"] == "subscribe" and \
"url" in data and "user" in clients[websocket]:
if data["user"] in users:
for client, info in clients.items():
if data["url"] in info["subscriptions"]:
client.send(json.dumps({"type": "duplicate"}))
elif client == websocket:
user_info = users[data["user"]]
if data["url"] in user_info["subscriptions"]:
info["subscriptions"].append(data["url"])
client.send(json.dumps({"type": "subscribed"}))
else:
client.send(json.dumps({"type": "unauthorized"}))
else:
client.send(json.dumps({
"type": "subscription",
"url": data["url"],
"user": data["user"]
}))
else:
websocket.send(json.dumps({"type": "invalid"}))
elif "type" in data and data["type"] == "navigation":
if "user" in clients[websocket]:
email = clients[websocket]["user"]
if email in users and "tabId" in data and "windowId" in data:
subscriptions = users[email]["subscriptions"]
if (data["tabId"], data["windowId"]) in subscriptions:
for client, _ in clients.items():
client.send(message)
else:
websocket.send(json.dumps({"type": "unauthorized"}))
except websockets.exceptions.ConnectionClosed:
pass
finally:
del clients[websocket]
- Modify the
main
function to load the list of authorized users:
- Create a file called
users.json
with a list of authorized users and their subscriptions:
{
"user1@example.com": {
"subscriptions": [
"https://www.example.com"
]
},
"user2@example.com": {
"subscriptions": [
"https://www.example.com",
"https://www.yahoo.com"
]
}
}
With these changes, the extension will authenticate the user before connecting to the WebSocket server and subscribing to another user’s navigation session. The server.py
script will ensure that only authorized users can subscribe to another user’s session. Note that this is just an example login/authentication system, and you may need to modify it to fit your specific requirements.
(mattnote: this was fairly good, but it forgot to await
all of the client.send()
calls so it’s leaking coroutines all over the place.
this is just from lack of sufficient training on this library though since this library also has a non-async mode where this does work, but the code went async, so it has to follow through if it wants to work. )
Programming 18
system> Hello Python 3.11 programming assistant and interactive web application creation helper! Can you help me make an application?
assistant> Of course! What kind of application do you have in mind?
user> Can you help me create a site with the functionality of reddit (user accounts, creating posts, upvote/downvote posts, creating nested reply threads on posts) written in the Python Dash framework? Please use dataclasses and full python type annotations as much as possible.
assistant> Sure! Just to clarify, you want me to help you create a web application using the Python Dash framework that has similar functionality to Reddit, including user accounts, creating posts, and upvoting/downvoting posts, with the added feature of creating nested reply threads on posts, correct? Also, do you have any specific database or authentication requirements?
user> Assume postgres for the database. Create all schemas needed to bootstrap the code in a bootstrap.py file. Create a logical authentication system with standard email/password combinations in the DB too please.
assistant> Got it! Here is one possible way to structure such an application using the Python Dash framework:
bootstrap.py
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
# Create the database if it doesn't exist
dbname = "reddit_clone"
conn = psycopg2.connect(database="postgres")
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute(f"CREATE DATABASE {dbname}")
cur.close()
conn.close()
# Connect to the newly created database
conn = psycopg2.connect(database=dbname)
# Create the user table
cur = conn.cursor()
cur.execute("""
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE,
password VARCHAR(255)
)
""")
cur.close()
# Create the post table
cur = conn.cursor()
cur.execute("""
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
author_id INTEGER REFERENCES users(id),
title VARCHAR(255),
text TEXT,
upvotes INTEGER DEFAULT 0,
downvotes INTEGER DEFAULT 0,
parent_post_id INTEGER REFERENCES posts(id),
timestamp TIMESTAMP DEFAULT NOW()
)
""")
cur.close()
# Create the comment table
cur = conn.cursor()
cur.execute("""
CREATE TABLE comments (
id SERIAL PRIMARY KEY,
author_id INTEGER REFERENCES users(id),
text TEXT,
upvotes INTEGER DEFAULT 0,
downvotes INTEGER DEFAULT 0,
parent_post_id INTEGER REFERENCES posts(id),
parent_comment_id INTEGER REFERENCES comments(id),
timestamp TIMESTAMP DEFAULT NOW()
)
""")
cur.close()
conn.commit()
conn.close()
app.py
import psycopg2
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State
from dataclasses import dataclass
from typing import Optional
@dataclass
class User:
id: int
email: str
password: str
@dataclass
class Post:
id: int
author_id: int
title: str
text: str
upvotes: int = 0
downvotes: int = 0
parent_post_id: Optional[int] = None
@dataclass
class Comment:
id: int
author_id: int
text: str
upvotes: int = 0
downvotes: int = 0
parent_post_id: Optional[int] = None
parent_comment_id: Optional[int] = None
def get_db_conn():
return psycopg2.connect(database="reddit_clone")
def get_user_by_email(email):
conn = get_db_conn()
cur = conn.cursor()
cur.execute("SELECT * FROM users WHERE email = %s", (email,))
row = cur.fetchone()
cur.close()
conn.close()
if row:
return User(*row)
return None
def create_user(email, password):
conn = get_db_conn()
cur = conn.cursor()
cur.execute(
"INSERT INTO users (email, password) VALUES (%s, %s) RETURNING id",
(email, password),
)
row = cur.fetchone()
cur.close()
conn.commit()
conn.close()
return User(row[0], email, password)
def create_post(author_id, title, text, parent_post_id=None):
conn = get_db_conn()
cur = conn.cursor()
cur.execute(
"INSERT INTO posts (author_id, title, text, parent_post_id) "
"VALUES (%s, %s, %s, %s) "
"RETURNING id",
(author_id, title, text, parent_post_id),
)
row = cur.fetchone()
cur.close()
conn.commit()
conn.close()
return Post(row[0], author_id, title, text, parent_post_id=parent_post_id)
def create_comment(author_id, text, parent_post_id=None, parent_comment_id=None):
conn = get_db_conn()
cur = conn.cursor()
cur.execute(
"INSERT INTO comments (author_id, text, parent_post_id, parent_comment_id) "
"VALUES (%s, %s, %s, %s) "
"RETURNING id",
(author_id, text, parent_post_id, parent_comment_id),
)
row = cur.fetchone()
cur.close()
conn.commit()
conn.close()
return Comment(
row[0], author_id, text, parent_post_id=parent_post_id, parent_comment_id=parent_comment_id
)
app = dash.Dash(__name__)
app.layout = html.Div(
[
html.H1("Reddit Clone"),
html.Div(
[
html.H2("Create Post"),
dcc.Input(id="post-title", placeholder="Title", type="text"),
dcc.Textarea(
id="post-text", placeholder="Text", style={"width": "100%", "height": 100}
),
html.Button("Submit", id="post-submit"),
],
style={"width": "50%"},
),
html.Div(
[
html.H2("Create Comment"),
dcc.Input(id="comment-parent-post-id", placeholder="Parent Post ID", type="number"),
dcc.Input(
id="comment-parent-comment-id",
placeholder="Parent Comment ID (optional)",
type="number",
),
dcc.Textarea(
id="comment-text", placeholder="Text", style={"width": "100%", "height": 100}
),
html.Button("Submit", id="comment-submit"),
],
style={"width": "50%"},
),
html.Div([], id="posts"),
],
style={"padding": "20px"},
)
@app.callback(
Output("posts", "children"),
[Input("post-submit", "n_clicks")],
[State("post-title", "value"), State("post-text", "value")],
)
def create_post_callback(n_clicks, title, text):
if not n_clicks:
return []
user = get_user_by_email("test@example.com") # TODO: get current user from session
post = create_post(user.id, title, text)
return html.Div(
[
html.H3(f"{post.title} ({post.upvotes - post.downvotes} points)"),
html.Div(post.text),
dcc.Input(
id=f"post-{post.id}-comment-text",
placeholder="Write a comment",
type="text",
style={"width": "80%"},
),
html.Button(
"Submit",
id=f"post-{post.id}-comment-submit",
n_clicks=0,
style={"margin": "10px"},
),
html.Ul(id=f"post-{post.id}-comments"),
],
style={"margin": "10px"},
)
@app.callback(
Output("post-{}-comments".format("{post_id}"), "children"),
[Input("post-{}-comment-submit".format("{post_id}"), "n_clicks")],
[State("post-{}-comment-text".format("{post_id}"), "value")],
)
def create_post_comment_callback(n_clicks, text, post_id):
if not n_clicks:
return []
user = get_user_by_email("test@example.com") # TODO: get current user from session
comment = create_comment(user.id, text, parent_post_id=post_id)
return html.Li(
[
html.H4(f"{user.email} ({comment.upvotes - comment.downvotes} points)"),
html.Div(comment.text),
dcc.Input(
id=f"comment-{comment.id}-comment-text",
placeholder="Write a reply",
type="text",
style={"width": "80%"},
),
html.Button(
"Submit",
id=f"comment-{comment.id}-comment-submit",
n_clicks=0,
style={"margin": "10px"},
),
html.Ul(id=f"comment-{comment.id}-comments"),
]
)
@app.callback(
Output("comment-{}-comments".format("{comment_id}"), "children"),
[Input("comment-{}-comment-submit".format("{comment_id}"), "n_clicks")],
[State("comment-{}-comment-text".format("{comment_id}"), "value")],
)
def create_comment_comment_callback(n_clicks, text, comment_id):
if not n_clicks:
return []
user = get_user_by_email("test@example.com") # TODO: get current user from session
comment = create_comment(user.id, text, parent_comment_id=comment_id)
return html.Li(
[
html.H4(f"{user.email} ({comment.upvotes - comment.downvotes} points)"),
html.Div(comment.text),
dcc.Input(
id=f"comment-{comment.id}-comment-text",
placeholder="Write a reply",
type="text",
style={"width": "80%"},
),
html.Button(
"Submit",
id=f"comment-{comment.id}-comment-submit",
n_clicks=0,
style={"margin": "10px"},
),
html.Ul(id=f"comment-{comment.id}-comments"),
]
)
if __name__ == "__main__":
app.run_server(debug=True)
This is just a starting point and can be expanded upon as needed, but it should give you a good idea of how to set up a Reddit clone using the Python Dash framework with a PostgreSQL database backend. You can also add more authentication and session management functionality as needed using Flask-Login or other similar libraries.
(mattnote: it made up some stuff here which I think just wont’t work (lol what is it formatting in the callbacks? and it’s using State()
it never created; and below it even invents a new state=
syntax which I don’t think exists in the library), but this would take probably 5 minutes to cleanup versus 90+ minutes to think through and write from scratch?
overall it’s still no worse than a meat human trying to remember how all this works after not having touched it for months and just making logical “oh, i forgot X” steps along the way.)
user> Can you clean up the app code and verify it is logically consistent? the password needs argon2, n_clicks failing should raise PreventDefault
and verify the callbacks make sense (they can’t be dynamically formatted as presented).
(mattnote: These feedback edits would be so much easier (and more powerful due to preserving context-length availability token space) if the model had the ability to just output diffs instead of rewriting an entire platform every single time.)
assistant> Of course! Here’s the modified version of the app code that utilizes Argon2 for password hashing and raises PreventDefault
when n_clicks
is None
. The callbacks have also been modified to eliminate the use of a dynamically formatted string:
import psycopg2
from argon2 import PasswordHasher
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State, MATCH, ALL
from dataclasses import dataclass
from typing import Optional
@dataclass
class User:
id: int
email: str
password: str
@dataclass
class Post:
id: int
author_id: int
title: str
text: str
upvotes: int = 0
downvotes: int = 0
parent_post_id: Optional[int] = None
@dataclass
class Comment:
id: int
author_id: int
text: str
upvotes: int = 0
downvotes: int = 0
parent_post_id: Optional[int] = None
parent_comment_id: Optional[int] = None
def get_db_conn():
return psycopg2.connect(database="reddit_clone")
def get_user_by_email(email):
conn = get_db_conn()
cur = conn.cursor()
cur.execute("SELECT * FROM users WHERE email = %s", (email,))
row = cur.fetchone()
cur.close()
conn.close()
if row:
return User(*row)
return None
def create_user(email, password):
conn = get_db_conn()
cur = conn.cursor()
password_hasher = PasswordHasher()
password_hash = password_hasher.hash(password)
cur.execute(
"INSERT INTO users (email, password) VALUES (%s, %s) RETURNING id",
(email, password_hash),
)
row = cur.fetchone()
cur.close()
conn.commit()
conn.close()
return User(row[0], email, password_hash)
def create_post(author_id, title, text, parent_post_id=None):
conn = get_db_conn()
cur = conn.cursor()
cur.execute(
"INSERT INTO posts (author_id, title, text, parent_post_id) "
"VALUES (%s, %s, %s, %s) "
"RETURNING id",
(author_id, title, text, parent_post_id),
)
row = cur.fetchone()
cur.close()
conn.commit()
conn.close()
return Post(row[0], author_id, title, text, parent_post_id=parent_post_id)
def create_comment(author_id, text, parent_post_id=None, parent_comment_id=None):
conn = get_db_conn()
cur = conn.cursor()
cur.execute(
"INSERT INTO comments (author_id, text, parent_post_id, parent_comment_id) "
"VALUES (%s, %s, %s, %s) "
"RETURNING id",
(author_id, text, parent_post_id, parent_comment_id),
)
row = cur.fetchone()
cur.close()
conn.commit()
conn.close()
return Comment(
row[0], author_id, text, parent_post_id=parent_post_id, parent_comment_id=parent_comment_id
)
app = dash.Dash(__name__)
app.layout = html.Div(
[
html.H1("Reddit Clone"),
html.Div(
[
html.H2("Create Post"),
dcc.Input(id="post-title", placeholder="Title", type="text"),
dcc.Textarea(
id="post-text", placeholder="Text", style={"width": "100%", "height": 100}
),
html.Button("Submit", id="post-submit"),
],
style={"width": "50%"},
),
html.Div(
[
html.H2("Create Comment"),
dcc.Input(id="comment-parent-post-id", placeholder="Parent Post ID", type="number"),
dcc.Input(
id="comment-parent-comment-id",
placeholder="Parent Comment ID (optional)",
type="number",
),
dcc.Textarea(
id="comment-text", placeholder="Text", style={"width": "100%", "height": 100}
),
html.Button("Submit", id="comment-submit"),
],
style={"width": "50%"},
),
html.Div([], id="posts"),
],
style={"padding": "20px"},
)
@app.callback(
Output("posts", "children"),
[Input("post-submit", "n_clicks")],
state={
"title": State("post-title", "value"),
"text": State("post-text", "value"),
},
)
def create_post_callback(n_clicks, title, text):
if n_clicks is None:
raise PreventDefault
user = get_user_by_email("test@example.com") # TODO: get current user from session
post = create_post(user.id, title, text)
return html.Div(
[
html.H3(f"{post.title} ({post.upvotes - post.downvotes} points)"),
html.Div(post.text),
dcc.Input(
id={"type": "post-comment-text", "index": post.id},
placeholder="Write a comment",
type="text",
style={"width": "80%"},
),
html.Button(
"Submit",
id={"type": "post-comment-submit", "index": post.id},
n_clicks=0,
style={"margin": "10px"},
),
html.Ul(id=f"post-{post.id}-comments"),
],
style={"margin": "10px"},
)
@app.callback(
Output({"type": "post-comment-submit-feedback", "index": MATCH["index"]}, "children"),
[Input({"type": "post-comment-submit", "index": MATCH["index"]}, "n_clicks")],
state={
"text": State({"type": "post-comment-text", "index": MATCH["index"]}, "value"),
"post_id": MATCH["index"],
},
)
def create_post_comment_callback(n_clicks, text, post_id):
if n_clicks is None:
raise PreventDefault
user = get_user_by_email("test@example.com") # TODO: get current user from session
comment = create_comment(user.id, text, parent_post_id=post_id)
return html.Li(
[
html.H4(f"{user.email} ({comment.upvotes - comment.downvotes} points)"),
html.Div(comment.text),
dcc.Input(
id={"type": "comment-comment-text", "index": comment.id},
placeholder="Write a reply",
type="text",
style={"width": "80%"},
),
html.Button(
"Submit",
id={"type": "comment-comment-submit", "index": comment.id},
n_clicks=0,
style={"margin": "10px"},
),
html.Ul(id=f"comment-{comment.id}-comments"),
]
)
@app.callback(
Output({"type": "comment-comment-submit-feedback", "index": MATCH["index"]}, "children"),
[Input({"type": "comment-comment-submit", "index": MATCH["index"]}, "n_clicks")],
state={
"text": State({"type": "comment-comment-text", "index": MATCH["index"]}, "value"),
"comment_id": MATCH["index"],
},
)
def create_comment_comment_callback(n_clicks, text, comment_id):
if n_clicks is None:
raise PreventDefault
user = get_user_by_email("test@example.com") # TODO: get current user from session
comment = create_comment(user.id, text, parent_comment_id=comment_id)
return html.Li(
[
html.H4(f"{user.email} ({comment.upvotes - comment.downvotes} points)"),
html.Div(comment.text),
dcc.Input(
id={"type": "comment-comment-text", "index": comment.id},
placeholder="Write a reply",
type="text",
style={"width": "80%"},
),
html.Button(
"Submit",
id={"type": "comment-comment-submit", "index": comment.id},
n_clicks=0,
style={"margin": "10px
(out of tokens, but this could have been broken into smaller chunks for more feature generation I imagine (like here, it only created users but there’s not login mechanism))
BONUS: Programming 18b
let’s do this again…
system> Hello Python 3.11 programming assistant and interactive web application creation helper! Can you help me make an application?
assistant> Of course! What kind of application would you like to create?
user> These are the FULL directions. You can begin writing code with all these details: Can you help me create a site with the functionality of reddit (user accounts, creating posts, upvote/downvote posts, creating nested reply threads on posts) written in the Python Dash framework? Please use dataclasses and full python type annotations as much as possible. Assume postgres for the database and async sqlalchemy for all database operations and ORM. Create all schemas needed to bootstrap the code in a bootstrap.py file. Create a logical authentication system with standard email/password combinations in the DB too please. Also for Dash, remember to use raise PreventDefault
and hash passwords with argon2 and use dash MATCH for callbacks. Use dataclasses with full type annotations in as many places as possible.
assistant> Wow, that’s a pretty complex application! I’ll do my best to help you out. Let’s break down the requirements and start with creating the necessary schemas in the bootstrap.py
file.
First, we’ll need a User
schema for user accounts:
from dataclasses import dataclass
from typing import Optional
@dataclass
class User:
id: Optional[int] = None
email: str
password: str
We’ll also need a schema for Post
:
from dataclasses import dataclass
from typing import Optional
@dataclass
class Post:
id: Optional[int] = None
author_id: int
title: str
body: str
created_at: Optional[datetime] = None
Next, we’ll need a schema for Reply
:
from dataclasses import dataclass
from typing import Optional
@dataclass
class Reply:
id: Optional[int] = None
author_id: int
post_id: int
parent_id: Optional[int] = None
body: str
created_at: Optional[datetime] = None
Now that we have our schemas, we can start setting up our database interactions using async sqlalchemy
. We’ll create a class called Database
that will manage our database operations:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, selectinload
from sqlalchemy import select, func
from .schemas import User, Post, Reply
class Database:
def __init__(self, db_url: str):
self.engine = create_async_engine(db_url)
self.async_session = sessionmaker(
bind=self.engine, class_=AsyncSession, expire_on_commit=False
)
async def create_user(self, email: str, password: str) -> User:
async with self.async_session() as session:
user = User(email=email, password=password)
session.add(user)
await session.commit()
await session.refresh(user)
return user
async def get_user_by_email(self, email: str) -> Optional[User]:
async with self.async_session() as session:
stmt = select(User).where(User.email == email)
result = await session.execute(stmt)
row = result.scalars().first()
return row
async def get_user_by_id(self, id: int) -> Optional[User]:
async with self.async_session() as session:
stmt = select(User).where(User.id == id)
result = await session.execute(stmt)
row = result.scalars().first()
return row
async def create_post(self, author_id: int, title: str, body: str) -> Post:
async with self.async_session() as session:
post = Post(author_id=author_id, title=title, body=body)
session.add(post)
await session.commit()
await session.refresh(post)
return post
async def get_post_by_id(self, id: int) -> Optional[Post]:
async with self.async_session() as session:
stmt = select(Post).where(Post.id == id)
result = await session.execute(stmt)
row = result.scalars().first()
return row
async def get_posts_by_author_id(
self, author_id: int, limit: Optional[int] = None, offset: int = 0
) -> List[Post]:
async with self.async_session() as session:
stmt = (
select(Post)
.where(Post.author_id == author_id)
.offset(offset)
.order_by(Post.created_at.desc())
)
if limit is not None:
stmt = stmt.limit(limit)
result = await session.execute(stmt)
rows = result.scalars().all()
return rows
async def create_reply(
self, author_id: int, post_id: int, body: str, parent_id: Optional[int] = None
) -> Reply:
async with self.async_session() as session:
reply = Reply(
author_id=author_id, post_id=post_id, body=body, parent_id=parent_id
)
session.add(reply)
await session.commit()
await session.refresh(reply)
return reply
async def get_reply_by_id(self, id: int) -> Optional[Reply]:
async with self.async_session() as session:
stmt = select(Reply).where(Reply.id == id)
result = await session.execute(stmt)
row = result.scalars().first()
return row
async def get_replies_by_post_id(
self, post_id: int, limit: Optional[int] = None, offset: int = 0
) -> List[Reply]:
async with self.async_session() as session:
stmt = (
select(Reply)
.where(Reply.post_id == post_id)
.offset(offset)
.order_by(Reply.created_at.asc())
)
if limit is not None:
stmt = stmt.limit(limit)
result = await session.execute(stmt)
rows = result.scalars().all()
return rows
async def get_reply_count_by_post_id(self, post_id: int) -> int:
async with self.async_session() as session:
stmt = select(func.count(Reply.id)).where(Reply.post_id == post_id)
result = await session.execute(stmt)
row = result.scalar()
return row
With our database operations set up, we can move on to our authentication system. We’ll use argon2
to hash passwords and store them in the database. Here’s the code for our auth.py
module:
import argon2
from .database import Database
from .schemas import User
from typing import Optional
class AuthService:
def __init__(self, db: Database):
self.db = db
self.hasher = argon2.PasswordHasher()
async def create_user(self, email: str, password: str) -> User:
hashed_password = self.hasher.hash(password)
user = await self.db.create_user(email=email, password=hashed_password)
return user
async def authenticate_user(self, email: str, password: str) -> Optional[User]:
user = await self.db.get_user_by_email(email)
if user is None:
return None
try:
self.hasher.verify(user.password, password)
except argon2.exceptions.VerifyMismatchError:
return None
return user
Next, we’ll move on to creating our application using the Python Dash
framework. We’ll start by setting up our app.py
file with some basic imports and initializing the Dash
object:
import os
from dotenv import load_dotenv
from typing import Tuple, Optional
from functools import wraps
from flask import request, redirect, url_for
import dash
import dash_core_components as dcc
import dash_html_components as html
import dash_bootstrap_components as dbc
from dash.dependencies import Input, Output, State, MATCH
from .database import Database
from .auth import AuthService
from .schemas import User, Post, Reply
load_dotenv()
db_url = os.getenv("DB_URL")
db = Database(db_url)
auth_service = AuthService(db)
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
Next, we’ll create some authentication-related routes. We’ll start with a login
route:
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
email = request.form["email"]
password = request.form["password"]
user = await auth_service.authenticate_user(email, password)
if user is None:
# authentication failed
return html.Div("Invalid email or password.")
else:
# authentication succeeded
session["user_id"] = user.id
return redirect(url_for("index"))
else:
# show login form
return html.Div(
[
html.H1("Log in"),
dbc.Form(
[
dbc.FormGroup(
[
dbc.Label("Email", html_for="email"),
dbc.Input(
type="email", id="email", name="email", required=True
),
]
),
dbc.FormGroup(
[
dbc.Label("Password", html_for="password"),
dbc.PasswordInput(
id="password", name="password", required=True
),
]
),
dbc.Button("Log in", type="submit", color="primary"),
],
method="POST",
),
html.Div(
"Don't have an account? ",
html.A("Sign up", href="/register"),
),
],
className="container",
)
We’ll also create a logout
route:
Now that we have our authentication routes set up, we can move on to building the functionality of our Reddit-like application. We’ll start with a simple route to display a list of posts:
@app.route("/")
def index():
posts = await db.get_posts()
return html.Div(
[
html.H1("Posts"),
html.Ul(
[
html.Li(
html.A(
post.title,
href=f"/posts/{post.id}",
)
)
for post in posts
]
),
html.Div(
html.Button("Create post", id="create-post-button"),
className="container",
),
],
className="container",
)
We’ll also add some callbacks for the create-post-button
button:
@app.callback(
Output("create-post-modal", "is_open"),
[Input("create-post-button", "n_clicks")],
[State("create-post-modal", "is_open")],
)
def toggle_create_post_modal(n_clicks, is_open):
if n_clicks is not None:
return not is_open
return is_open
@app.callback(
Output("create-post-form-alert", "children"),
[Input("create-post-form", "submit_n_clicks")],
[State("create-post-form-title", "value"), State("create-post-form-body", "value")],
)
async def create_post(submit_n_clicks, title, body):
if submit_n_clicks is not None:
if title and body:
user_id = session.get("user_id")
if user_id is not None:
post = await db.create_post(author_id=user_id, title=title, body=body)
return dbc.Alert(
"Post created.", color="success", dismissable=True, duration=5000
)
else:
return dbc.Alert(
"You must be logged in to create a post.",
color="warning",
dismissable=True,
duration=5000,
)
else:
return dbc.Alert(
"Please fill out all required fields.",
color="danger",
dismissable=True,
duration=5000,
)
return None
We’ll also create a view post
route:
@app.route("/posts/<int:post_id>")
def view_post(post_id: int):
post = await db.get_post_by_id(post_id)
replies = await db.get_replies_by_post_id(post_id)
return html.Div(
[
html.H1(post.title),
html.P(post.body),
html.H2("Replies"),
html.Ul(
[
html.Li(
html.A(
reply.body,
href=f"/posts/{post_id}/replies/{reply.id}",
)
)
for reply in replies
]
),
html.Form(
[
dbc.FormGroup(
[
dbc.Label("Reply"),
dbc.Textarea(id="create-reply-form-body", required=True),
]
),
dbc.Button("Submit", type="submit"),
],
id="create-reply-form",
method="POST",
),
],
className="container",
)
And some callbacks for the create-reply-form
:
@app.callback(
Output("create-reply-form-alert", "children"),
[
Input("create-reply-form", "submit_n_clicks"),
Input("create-reply-form-cancel-button", "n_clicks"),
],
[State("create-reply-form-body", "value"), State("create-reply-form", "pathname")],
)
async def create_reply(submit_n_clicks, cancel_n_clicks, body, pathname):
if body is None:
return None
if submit_n_clicks is not None:
user_id = session.get("user_id")
if user_id is None:
return dbc.Alert(
"You must be logged in to create a reply.",
color="warning",
dismissable=True,
duration=5000,
)
post_id = int(pathname.split("/")[-2])
parent_id = int(pathname.split("/")[-1].split("?")[0])
reply = await db.create_reply(
author_id=user_id, post_id=post_id, body=body, parent_id=parent_id
)
return dbc.Alert(
"Reply added.",
color="success",
dismissable=True,
duration=5000,
)
if cancel_n_clicks is not None:
return None
return None
Those are the basics of how to create a Reddit-like application using the Python Dash
framework. Of course, this is just scratching the surface of what’s possible with Dash
. But hopefully, this is enough to get you started!
user> Can you also write some methods to run reporting on the database entries for finding the top 10 upvoted posts and top 10 users with most posts and top 10 users with most comments and also the top 10 posts with the deepest comment depth?
assistant> Yes, I can help you write those methods. Here are some example implementations for each report:
Find the Top 10 Upvoted Posts:
async def get_top_upvoted_posts(limit: int = 10) -> List[Post]:
async with engine.begin() as conn:
result = await conn.execute(
"""
SELECT posts.*, COUNT(votes.id) as vote_count
FROM posts
LEFT JOIN votes ON votes.post_id = posts.id AND votes.direction = 1
GROUP BY posts.id
ORDER BY vote_count DESC
LIMIT :limit
""",
{"limit": limit},
)
rows = await result.fetchall()
posts = [Post(**row._asdict()) for row in rows]
return posts
Find the Top 10 Users with the Most Posts:
async def get_top_posting_users(limit: int = 10) -> List[Tuple[User, int]]:
async with engine.begin() as conn:
result = await conn.execute(
"""
SELECT users.*, COUNT(posts.id) as post_count
FROM users
LEFT JOIN posts ON posts.author_id = users.id
GROUP BY users.id
ORDER BY post_count DESC
LIMIT :limit
""",
{"limit": limit},
)
rows = await result.fetchall()
users = [(User(**row._asdict()), row.post_count) for row in rows]
return users
Find the Top 10 Users with the Most Comments:
async def get_top_commenting_users(limit: int = 10) -> List[Tuple[User, int]]:
async with engine.begin() as conn:
result = await conn.execute(
"""
SELECT users.*, COUNT(comments.id) as comment_count
FROM users
LEFT JOIN comments ON comments.author_id = users.id
GROUP BY users.id
ORDER BY comment_count DESC
LIMIT :limit
""",
{"limit": limit},
)
rows = await result.fetchall()
users = [(User(**row._asdict()), row.comment_count) for row in rows]
return users
Find the Top 10 Posts with the Deepest Comment Depth:
async def get_top_posts_by_comment_depth(limit: int = 10) -> List[Tuple[Post, int]]:
async with engine.begin() as conn:
result = await conn.execute(
"""
WITH RECURSIVE "comments_hierarchy" AS (
SELECT "comments".*, 1 as depth
FROM "comments"
WHERE "comments"."parent_id" IS NULL
UNION ALL
SELECT "comments".*, "comments_hierarchy"."depth" + 1
FROM "comments"
JOIN "comments_hierarchy" ON "comments"."parent_id" = "comments_hierarchy"."id"
)
SELECT "posts".*, MAX("comments_hierarchy".depth) as max_depth
FROM "posts"
JOIN "comments_hierarchy" ON "comments_hierarchy"."post_id" = "posts"."id"
GROUP BY "posts"."id"
ORDER BY max_depth DESC
LIMIT :limit
""",
{"limit": limit},
)
rows = await result.fetchall()
posts = [(Post(**row._asdict()), row.max_depth) for row in rows]
return posts
(mattnote: it didn’t entirely follow the previous output here because it used a votes
table which was never declared so the logic for “counting posts” looks weird and wrong. Looks like it imagined a new votes
table recording something like: (post_id, is_upvotes_or_downvotes, vote_count), so it is assuming the vote counts already exist in the extra votes
table and can just order by them?
Nothing additional details/clarity couldn’t fix, but amusing to see what it assumes given lack of information.
Also, unlike the previous attempt, this run didn’t generate any tables in the DB at all. It just generated the local classes then started using them in the DB as if they already existed.)
user> Can you rewrite the reports only using SQL alchemy but in reverse order so depth report is first?
assistant> Sure, here’s the updated version of the reporting methods that use SQLAlchemy instead of raw SQL queries. I’ve also reversed the order of the reports so that the top 10 posts with the deepest comment depth are first:
Find the Top 10 Posts with the Deepest Comment Depth:
from sqlalchemy import desc
async def get_top_posts_by_comment_depth(limit: int = 10) -> List[Tuple[Post, int]]:
async with Session() as session:
subquery = (
session.query(
Comment.post_id,
func.max(
func.coalesce(
Comment.parent_id,
Comment.id,
),
),
)
.group_by(Comment.post_id)
.subquery()
)
query = (
session.query(
Post,
coalesce(
func.max(subquery.columns[1]) - func.min(subquery.columns[1]),
0,
),
(out of tokens again, but we got to see an idea of how it would manage the recrusive sqlalchemy query)
pretty darn good here even with recurisve SQL which always breaks my brain a bit, but it still invented some concepts like dash having @app.route()
which isn’t a thing. Looks like it invented a new API as a hybrid of flask and dash. I guess another request could be: “now make a system with the features you just created!”