Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions backend/director/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,41 @@ class EnvPrefix(str, Enum):
GOOGLEAI_ = "GOOGLEAI_"

DOWNLOADS_PATH="director/downloads"

CHAT_NAMING_SYSTEM_PROMPT = """
You are an assistant that generates short, descriptive titles for chat conversations.
The title should summarize the main intent or topic of the user's first message.

Guidelines:

**Output you give should strictly just be the title without any markdown content in plain text**

Keep the title under 6 words.

Use concise, professional phrasing.

Don't include punctuation unless necessary.

Capitalize like a headline (e.g., “Check SQL Query Logic”).

Avoid emojis or filler words.

Example input and outputs:

“Can you give me download links for these videos?” → Generate Video Download Links

“Summarize the lecture video for me” → Lecture Video Summary

“Add a watermark to my demo” → Add Watermark to Video

“Trim the video from 2:10 to 3:45” → Trim Video Segment

“Combine these three clips into one” → Merge Video Clips

“Extract subtitles from this recording” → Extract Video Subtitles

“Translate this video into Spanish” → Translate Video to Spanish

“Generate a short trailer from the full video” → Create Video Trailer

"""
21 changes: 14 additions & 7 deletions backend/director/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,28 +220,31 @@ class OutputMessage(BaseMessage):
status: MsgStatus = MsgStatus.progress

def update_status(self, status: MsgStatus):
"""Update the status of the message and publish the message to the socket. for loading state."""
"""Update the status and broadcast to session room."""
self.status = status
self._publish()

def push_update(self):
"""Publish the message to the socket."""
"""Push real-time update to session room."""
try:
self._publish()
except Exception as e:
print(f"Error in emitting message: {str(e)}")
print(f"Error in emitting update to session {self.session_id}: {str(e)}")

def publish(self):
"""Store the message in the database. for conversation history and publish the message to the socket."""
"""Store in database and broadcast final result to session room."""
self._publish()

def _publish(self):
try:
emit("chat", self.model_dump(), namespace="/chat")
emit("chat", self.model_dump(),
room=self.session_id,
namespace="/chat")
print(f"Emitted message to session room: {self.session_id}")
except Exception as e:
print(f"Error in emitting message: {str(e)}")
self.db.add_or_update_msg_to_conv(**self.model_dump())
print(f"Error emitting to session {self.session_id}: {str(e)}")

self.db.add_or_update_msg_to_conv(**self.model_dump())

Comment on lines +240 to 248
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Persist before emitting to prevent “phantom” messages.

Emit currently happens before DB write, so a failed write can leave clients seeing messages that aren’t in history. Persist first, then emit.

Apply this diff:

-    def _publish(self):
-        try:
-            emit("chat", self.model_dump(), 
-                 room=self.session_id, 
-                 namespace="/chat")
-            print(f"Emitted message to session room: {self.session_id}")
-        except Exception as e:
-            print(f"Error emitting to session {self.session_id}: {str(e)}")
-
-        self.db.add_or_update_msg_to_conv(**self.model_dump())
+    def _publish(self):
+        payload = self.model_dump(exclude={"db"})
+        # Persist first to keep history consistent with real-time updates
+        try:
+            self.db.add_or_update_msg_to_conv(**payload)
+        except Exception as e:
+            print(f"Error persisting message for session {self.session_id}: {e!s}")
+        # Then emit to the session room
+        try:
+            emit("chat", payload, room=self.session_id, namespace="/chat")
+            print(f"Emitted message to session room: {self.session_id}")
+        except Exception as e:
+            print(f"Error emitting to session {self.session_id}: {e!s}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
emit("chat", self.model_dump(),
room=self.session_id,
namespace="/chat")
print(f"Emitted message to session room: {self.session_id}")
except Exception as e:
print(f"Error in emitting message: {str(e)}")
self.db.add_or_update_msg_to_conv(**self.model_dump())
print(f"Error emitting to session {self.session_id}: {str(e)}")
self.db.add_or_update_msg_to_conv(**self.model_dump())
def _publish(self):
payload = self.model_dump(exclude={"db"})
# Persist first to keep history consistent with real-time updates
try:
self.db.add_or_update_msg_to_conv(**payload)
except Exception as e:
print(f"Error persisting message for session {self.session_id}: {e!s}")
# Then emit to the session room
try:
emit("chat", payload, room=self.session_id, namespace="/chat")
print(f"Emitted message to session room: {self.session_id}")
except Exception as e:
print(f"Error emitting to session {self.session_id}: {e!s}")
🧰 Tools
🪛 Ruff (0.13.1)

244-244: Do not catch blind exception: Exception

(BLE001)


245-245: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In backend/director/core/session.py around lines 240 to 248, the code emits the
chat message before persisting it, which can create “phantom” messages if the DB
write fails; move the DB write
(self.db.add_or_update_msg_to_conv(**self.model_dump())) to occur before emit,
check for exceptions on the DB operation and only proceed to emit if persistence
succeeds, and in the DB-exception path log/raise the error (and avoid emitting)
so clients only see messages that are actually stored.

def format_user_message(message: dict) -> dict:
message_content = message.get("content")
Expand Down Expand Up @@ -393,6 +396,10 @@ def delete(self):
"""Delete the session from the database."""
return self.db.delete_session(self.session_id)

def update(self, **kwargs) -> bool:
"""Update the session in the database."""
return self.db.update_session(self.session_id, **kwargs)

def emit_event(self, event: BaseEvent, namespace="/chat"):
"""Emits a structured WebSocket event to notify all clients about updates."""

Expand Down
22 changes: 22 additions & 0 deletions backend/director/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ def add_or_update_context_msg(
"""Update context messages for a session."""
pass

@abstractmethod
def update_session(self, session_id: str, **kwargs) -> bool:
"""Update a session in the database."""
pass

@abstractmethod
def delete_session(self, session_id: str) -> tuple[bool, list]:
"""Delete a session from the database.
:return: (success, failed_components)
"""
pass

@abstractmethod
def make_session_public(self, session_id: str, is_public: bool) -> bool:
"""Make a session public or private."""
pass

@abstractmethod
def get_public_session(self, session_id: str) -> dict:
"""Get a public session by session_id."""
pass

@abstractmethod
def health_check(self) -> bool:
"""Check if the database is healthy."""
Expand Down
93 changes: 89 additions & 4 deletions backend/director/db/postgres/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ def __init__(self):
port=os.getenv("POSTGRES_PORT", "5432"),
)
self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)

initialize_postgres()
def create_session(
self,
session_id: str,
video_id: str,
collection_id: str,
name: str = None,
created_at: int = None,
updated_at: int = None,
metadata: dict = {},
Expand All @@ -47,14 +48,15 @@ def create_session(

self.cursor.execute(
"""
INSERT INTO sessions (session_id, video_id, collection_id, created_at, updated_at, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
INSERT INTO sessions (session_id, video_id, collection_id, name, created_at, updated_at, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (session_id) DO NOTHING
""",
(
session_id,
video_id,
collection_id,
name,
created_at,
updated_at,
json.dumps(metadata),
Expand Down Expand Up @@ -195,7 +197,7 @@ def delete_context(self, session_id: str) -> bool:
self.conn.commit()
return self.cursor.rowcount > 0

def delete_session(self, session_id: str) -> bool:
def delete_session(self, session_id: str) -> tuple[bool, list]:
failed_components = []
if not self.delete_conversation(session_id):
failed_components.append("conversation")
Expand All @@ -210,6 +212,89 @@ def delete_session(self, session_id: str) -> bool:
success = len(failed_components) < 3
return success, failed_components

def update_session(self, session_id: str, **kwargs) -> bool:
"""Update a session in the database."""
try:
if not kwargs:
return False

allowed_fields = {"name", "video_id", "collection_id", "metadata"}
update_fields = []
update_values = []

for key, value in kwargs.items():
if key not in allowed_fields:
continue
if key == "metadata" and not isinstance(value, str):
value = json.dumps(value)
update_fields.append(f"{key} = %s")
update_values.append(value)

if not update_fields:
return False

update_fields.append("updated_at = %s")
update_values.append(int(time.time()))

update_values.extend([session_id])

query = f"""
UPDATE sessions
SET {', '.join(update_fields)}
WHERE session_id = %s
"""

self.cursor.execute(query, update_values)
self.conn.commit()
return self.cursor.rowcount > 0

except Exception:
logger.exception(f"Error updating session {session_id}")
return False

def make_session_public(self, session_id: str, is_public: bool) -> bool:
"""Make a session public or private."""
try:
query = """
UPDATE sessions
SET is_public = %s, updated_at = %s
WHERE session_id = %s
"""
current_time = int(time.time())
self.cursor.execute(query, (is_public, current_time, session_id))
self.conn.commit()
return self.cursor.rowcount > 0
except Exception as e:
logger.exception(f"Error making session public/private: {e}")
return False

def get_public_session(self, session_id: str) -> dict:
"""Get a public session by session_id."""
try:
query = """
SELECT session_id, video_id, collection_id, name, created_at, updated_at, metadata, is_public
FROM sessions
WHERE session_id = %s AND is_public = TRUE
"""
self.cursor.execute(query, (session_id,))
row = self.cursor.fetchone()
if row:
session = {
"session_id": row["session_id"],
"video_id": row["video_id"],
"collection_id": row["collection_id"],
"name": row["name"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"metadata": row["metadata"] if row["metadata"] else {},
"is_public": row["is_public"]
}
return session
return {}
except Exception as e:
logger.exception(f"Error getting public session: {e}")
return {}

def health_check(self) -> bool:
try:
query = """
Expand Down
4 changes: 4 additions & 0 deletions backend/director/db/postgres/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
session_id TEXT PRIMARY KEY,
video_id TEXT,
collection_id TEXT,
name TEXT,
is_public BOOLEAN DEFAULT FALSE,
created_at BIGINT,
updated_at BIGINT,
metadata JSONB
Expand Down Expand Up @@ -68,6 +70,8 @@ def initialize_postgres():
cursor.execute(CREATE_SESSIONS_TABLE)
cursor.execute(CREATE_CONVERSATIONS_TABLE)
cursor.execute(CREATE_CONTEXT_MESSAGES_TABLE)
cursor.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS name TEXT")
cursor.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS is_public BOOLEAN DEFAULT FALSE")
conn.commit()
logger.info("PostgreSQL tables created successfully")
except Exception as e:
Expand Down
Loading