mirror of
https://github.com/aserper/masto-rss.git
synced 2026-01-10 07:43:52 +00:00
Modernize codebase: Use pathlib, logging, dataclasses, and update dependencies
This commit is contained in:
@@ -9,6 +9,7 @@
|
|||||||
[](LICENSE)
|
[](LICENSE)
|
||||||
[](https://www.python.org/downloads/)
|
[](https://www.python.org/downloads/)
|
||||||
[](https://github.com/aserper/masto-rss)
|
[](https://github.com/aserper/masto-rss)
|
||||||
|
[](https://github.com/aserper/masto-rss/network)
|
||||||
|
|
||||||
A simple, lightweight Mastodon bot that automatically posts updates from RSS feeds to the Fediverse. Built with Python and designed to run seamlessly in Docker with multiarch support (amd64 & arm64).
|
A simple, lightweight Mastodon bot that automatically posts updates from RSS feeds to the Fediverse. Built with Python and designed to run seamlessly in Docker with multiarch support (amd64 & arm64).
|
||||||
|
|
||||||
|
|||||||
66
bot.py
66
bot.py
@@ -1,10 +1,16 @@
|
|||||||
"""Mastodon RSS Bot - Core functionality"""
|
"""Mastodon RSS Bot - Core functionality"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Optional, Set
|
||||||
|
|
||||||
import feedparser
|
import feedparser
|
||||||
from mastodon import Mastodon
|
from mastodon import Mastodon
|
||||||
import os
|
|
||||||
import time
|
# Configure logging for this module
|
||||||
from typing import Set, Optional
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class MastodonRSSBot:
|
class MastodonRSSBot:
|
||||||
@@ -16,10 +22,10 @@ class MastodonRSSBot:
|
|||||||
client_secret: str,
|
client_secret: str,
|
||||||
access_token: str,
|
access_token: str,
|
||||||
instance_url: str,
|
instance_url: str,
|
||||||
feed_urls: list[str],
|
feed_urls: List[str],
|
||||||
toot_visibility: str = "public",
|
toot_visibility: str = "public",
|
||||||
check_interval: int = 300,
|
check_interval: int = 300,
|
||||||
state_file: str = "/state/processed_entries.txt",
|
state_file: Path = Path("/state/processed_entries.txt"),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Initialize the Mastodon RSS bot.
|
Initialize the Mastodon RSS bot.
|
||||||
@@ -37,7 +43,7 @@ class MastodonRSSBot:
|
|||||||
self.feed_urls = feed_urls
|
self.feed_urls = feed_urls
|
||||||
self.toot_visibility = toot_visibility
|
self.toot_visibility = toot_visibility
|
||||||
self.check_interval = check_interval
|
self.check_interval = check_interval
|
||||||
self.state_file = state_file
|
self.state_file = Path(state_file)
|
||||||
|
|
||||||
# Initialize Mastodon client
|
# Initialize Mastodon client
|
||||||
self.mastodon = Mastodon(
|
self.mastodon = Mastodon(
|
||||||
@@ -54,10 +60,13 @@ class MastodonRSSBot:
|
|||||||
Returns:
|
Returns:
|
||||||
Set of URLs that have been processed
|
Set of URLs that have been processed
|
||||||
"""
|
"""
|
||||||
|
if not self.state_file.exists():
|
||||||
|
return set()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(self.state_file, "r") as file:
|
return set(self.state_file.read_text().splitlines())
|
||||||
return set(file.read().splitlines())
|
except Exception as e:
|
||||||
except FileNotFoundError:
|
logger.error(f"Error loading processed entries from {self.state_file}: {e}")
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
def save_processed_entries(self, processed_entries: Set[str]) -> None:
|
def save_processed_entries(self, processed_entries: Set[str]) -> None:
|
||||||
@@ -68,10 +77,11 @@ class MastodonRSSBot:
|
|||||||
processed_entries: Set of processed entry URLs
|
processed_entries: Set of processed entry URLs
|
||||||
"""
|
"""
|
||||||
# Ensure directory exists
|
# Ensure directory exists
|
||||||
os.makedirs(os.path.dirname(self.state_file), exist_ok=True)
|
try:
|
||||||
|
self.state_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
with open(self.state_file, "w") as file:
|
self.state_file.write_text("\n".join(sorted(processed_entries)))
|
||||||
file.write("\n".join(sorted(processed_entries)))
|
except Exception as e:
|
||||||
|
logger.error(f"Error saving processed entries to {self.state_file}: {e}")
|
||||||
|
|
||||||
def parse_feed(self, feed_url: str) -> Optional[feedparser.FeedParserDict]:
|
def parse_feed(self, feed_url: str) -> Optional[feedparser.FeedParserDict]:
|
||||||
"""
|
"""
|
||||||
@@ -86,12 +96,12 @@ class MastodonRSSBot:
|
|||||||
try:
|
try:
|
||||||
feed = feedparser.parse(feed_url)
|
feed = feedparser.parse(feed_url)
|
||||||
if hasattr(feed, "bozo_exception"):
|
if hasattr(feed, "bozo_exception"):
|
||||||
print(
|
logger.warning(
|
||||||
f"Warning: Feed parsing issue for {feed_url}: {feed.bozo_exception}"
|
f"Feed parsing issue for {feed_url}: {feed.bozo_exception}"
|
||||||
)
|
)
|
||||||
return feed
|
return feed
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error parsing feed {feed_url}: {e}")
|
logger.error(f"Error parsing feed {feed_url}: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def format_status(self, entry: feedparser.FeedParserDict) -> str:
|
def format_status(self, entry: feedparser.FeedParserDict) -> str:
|
||||||
@@ -122,7 +132,7 @@ class MastodonRSSBot:
|
|||||||
self.mastodon.status_post(status, visibility=self.toot_visibility)
|
self.mastodon.status_post(status, visibility=self.toot_visibility)
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error posting to Mastodon: {e}")
|
logger.error(f"Error posting to Mastodon: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def process_feed(self, feed_url: str, processed_entries: Set[str]) -> int:
|
def process_feed(self, feed_url: str, processed_entries: Set[str]) -> int:
|
||||||
@@ -136,10 +146,10 @@ class MastodonRSSBot:
|
|||||||
Returns:
|
Returns:
|
||||||
Number of new entries posted
|
Number of new entries posted
|
||||||
"""
|
"""
|
||||||
print(f"Checking feed: {feed_url}")
|
logger.info(f"Checking feed: {feed_url}")
|
||||||
feed = self.parse_feed(feed_url)
|
feed = self.parse_feed(feed_url)
|
||||||
if not feed or not hasattr(feed, "entries"):
|
if not feed or not hasattr(feed, "entries"):
|
||||||
print(f"No entries found in feed: {feed_url}")
|
logger.warning(f"No entries found in feed: {feed_url}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
new_entries_count = 0
|
new_entries_count = 0
|
||||||
@@ -149,13 +159,13 @@ class MastodonRSSBot:
|
|||||||
entry_url = entry.get("link", "")
|
entry_url = entry.get("link", "")
|
||||||
|
|
||||||
if not entry_url:
|
if not entry_url:
|
||||||
print("Skipping entry without URL")
|
logger.debug("Skipping entry without URL")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check if entry is new
|
# Check if entry is new
|
||||||
if entry_url not in processed_entries:
|
if entry_url not in processed_entries:
|
||||||
title = entry.get("title", "Untitled")
|
title = entry.get("title", "Untitled")
|
||||||
print(f"Found a new RSS item: {title}")
|
logger.info(f"Found a new RSS item: {title}")
|
||||||
|
|
||||||
# Format and post status
|
# Format and post status
|
||||||
status = self.format_status(entry)
|
status = self.format_status(entry)
|
||||||
@@ -163,7 +173,7 @@ class MastodonRSSBot:
|
|||||||
processed_entries.add(entry_url)
|
processed_entries.add(entry_url)
|
||||||
new_entries_count += 1
|
new_entries_count += 1
|
||||||
else:
|
else:
|
||||||
print(f"Failed to post entry: {title}")
|
logger.error(f"Failed to post entry: {title}")
|
||||||
|
|
||||||
return new_entries_count
|
return new_entries_count
|
||||||
|
|
||||||
@@ -174,7 +184,7 @@ class MastodonRSSBot:
|
|||||||
Returns:
|
Returns:
|
||||||
Total number of new entries posted across all feeds
|
Total number of new entries posted across all feeds
|
||||||
"""
|
"""
|
||||||
print("Checking for new RSS items...")
|
logger.info("Checking for new RSS items...")
|
||||||
|
|
||||||
# Load processed entries
|
# Load processed entries
|
||||||
processed_entries = self.load_processed_entries()
|
processed_entries = self.load_processed_entries()
|
||||||
@@ -198,15 +208,15 @@ class MastodonRSSBot:
|
|||||||
try:
|
try:
|
||||||
count = self.process_new_entries()
|
count = self.process_new_entries()
|
||||||
if count > 0:
|
if count > 0:
|
||||||
print(f"Posted {count} new entries")
|
logger.info(f"Posted {count} new entries")
|
||||||
|
|
||||||
print(f"Sleeping for {self.check_interval} seconds...")
|
logger.info(f"Sleeping for {self.check_interval} seconds...")
|
||||||
time.sleep(self.check_interval)
|
time.sleep(self.check_interval)
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("\nBot stopped by user")
|
logger.info("Bot stopped by user")
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error in main loop: {e}")
|
logger.error(f"Error in main loop: {e}", exc_info=True)
|
||||||
print(f"Retrying in {self.check_interval} seconds...")
|
logger.info(f"Retrying in {self.check_interval} seconds...")
|
||||||
time.sleep(self.check_interval)
|
time.sleep(self.check_interval)
|
||||||
|
|||||||
185
main.py
185
main.py
@@ -1,80 +1,135 @@
|
|||||||
"""Mastodon RSS Bot - Entry point"""
|
import logging
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
from bot import MastodonRSSBot
|
from bot import MastodonRSSBot
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||||
|
handlers=[logging.StreamHandler(sys.stdout)]
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Config:
|
||||||
|
"""Configuration loaded from environment variables."""
|
||||||
|
instance_url: str
|
||||||
|
client_id: str
|
||||||
|
client_secret: str
|
||||||
|
access_token: str
|
||||||
|
feed_urls: List[str] = field(default_factory=list)
|
||||||
|
toot_visibility: str = "public"
|
||||||
|
check_interval: int = 300
|
||||||
|
state_file: Path = field(default_factory=lambda: Path("/state/processed_entries.txt"))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_env(cls) -> "Config":
|
||||||
|
"""Load configuration from environment variables."""
|
||||||
|
instance_url = os.environ.get("MASTODON_INSTANCE_URL")
|
||||||
|
client_id = os.environ.get("MASTODON_CLIENT_ID")
|
||||||
|
client_secret = os.environ.get("MASTODON_CLIENT_SECRET")
|
||||||
|
access_token = os.environ.get("MASTODON_ACCESS_TOKEN")
|
||||||
|
|
||||||
|
if not all([instance_url, client_id, client_secret, access_token]):
|
||||||
|
missing = [
|
||||||
|
k for k, v in {
|
||||||
|
"MASTODON_INSTANCE_URL": instance_url,
|
||||||
|
"MASTODON_CLIENT_ID": client_id,
|
||||||
|
"MASTODON_CLIENT_SECRET": client_secret,
|
||||||
|
"MASTODON_ACCESS_TOKEN": access_token
|
||||||
|
}.items() if not v
|
||||||
|
]
|
||||||
|
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
|
||||||
|
|
||||||
|
# Parse feeds
|
||||||
|
feed_urls = []
|
||||||
|
|
||||||
|
# 1. Legacy single feed URL
|
||||||
|
if os.environ.get("RSS_FEED_URL"):
|
||||||
|
feed_urls.append(os.environ["RSS_FEED_URL"])
|
||||||
|
|
||||||
|
# 2. Comma-separated list of feeds
|
||||||
|
if os.environ.get("RSS_FEEDS"):
|
||||||
|
feeds = [
|
||||||
|
url.strip() for url in os.environ["RSS_FEEDS"].split(",") if url.strip()
|
||||||
|
]
|
||||||
|
feed_urls.extend(feeds)
|
||||||
|
|
||||||
|
# 3. File containing list of feeds
|
||||||
|
feeds_file = os.environ.get("FEEDS_FILE")
|
||||||
|
if feeds_file:
|
||||||
|
path = Path(feeds_file)
|
||||||
|
if path.exists():
|
||||||
|
try:
|
||||||
|
content = path.read_text().splitlines()
|
||||||
|
file_feeds = [
|
||||||
|
line.strip()
|
||||||
|
for line in content
|
||||||
|
if line.strip() and not line.startswith("#")
|
||||||
|
]
|
||||||
|
feed_urls.extend(file_feeds)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error reading feeds file {feeds_file}: {e}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Feeds file configured but not found: {feeds_file}")
|
||||||
|
|
||||||
|
# Deduplicate while preserving order
|
||||||
|
unique_feed_urls = list(dict.fromkeys(feed_urls))
|
||||||
|
|
||||||
|
if not unique_feed_urls:
|
||||||
|
raise ValueError("No RSS feeds configured. Please set RSS_FEED_URL, RSS_FEEDS, or FEEDS_FILE.")
|
||||||
|
|
||||||
|
return cls(
|
||||||
|
instance_url=instance_url, # type: ignore # checked above
|
||||||
|
client_id=client_id, # type: ignore
|
||||||
|
client_secret=client_secret,# type: ignore
|
||||||
|
access_token=access_token, # type: ignore
|
||||||
|
feed_urls=unique_feed_urls,
|
||||||
|
toot_visibility=os.environ.get("TOOT_VISIBILITY", "public"),
|
||||||
|
check_interval=int(os.environ.get("CHECK_INTERVAL", "300")),
|
||||||
|
state_file=Path(os.environ.get("PROCESSED_ENTRIES_FILE", "/state/processed_entries.txt"))
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Initialize and run the bot with environment configuration"""
|
"""Initialize and run the bot with environment configuration"""
|
||||||
print("Starting Mastodon RSS Bot...")
|
logger.info("Starting Mastodon RSS Bot...")
|
||||||
|
|
||||||
# Load configuration from environment variables
|
try:
|
||||||
feed_urls = []
|
config = Config.from_env()
|
||||||
|
except ValueError as e:
|
||||||
|
logger.critical(str(e))
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"Failed to load configuration: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
# 1. Legacy single feed URL
|
logger.info("Bot configured successfully:")
|
||||||
if os.environ.get("RSS_FEED_URL"):
|
logger.info(f" Instance: {config.instance_url}")
|
||||||
feed_urls.append(os.environ["RSS_FEED_URL"])
|
logger.info(f" Monitoring {len(config.feed_urls)} feed(s):")
|
||||||
|
for url in config.feed_urls:
|
||||||
# 2. Comma-separated list of feeds
|
logger.info(f" - {url}")
|
||||||
if os.environ.get("RSS_FEEDS"):
|
logger.info(f" Visibility: {config.toot_visibility}")
|
||||||
feeds = [
|
logger.info(f" Check interval: {config.check_interval} seconds")
|
||||||
url.strip() for url in os.environ["RSS_FEEDS"].split(",") if url.strip()
|
logger.info(f" State file: {config.state_file}")
|
||||||
]
|
|
||||||
feed_urls.extend(feeds)
|
|
||||||
|
|
||||||
# 3. File containing list of feeds
|
|
||||||
feeds_file = os.environ.get("FEEDS_FILE")
|
|
||||||
if feeds_file and os.path.exists(feeds_file):
|
|
||||||
try:
|
|
||||||
with open(feeds_file, "r") as f:
|
|
||||||
file_feeds = [
|
|
||||||
line.strip()
|
|
||||||
for line in f
|
|
||||||
if line.strip() and not line.startswith("#")
|
|
||||||
]
|
|
||||||
feed_urls.extend(file_feeds)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error reading feeds file {feeds_file}: {e}")
|
|
||||||
|
|
||||||
# Deduplicate while preserving order
|
|
||||||
unique_feed_urls = []
|
|
||||||
seen = set()
|
|
||||||
for url in feed_urls:
|
|
||||||
if url not in seen:
|
|
||||||
unique_feed_urls.append(url)
|
|
||||||
seen.add(url)
|
|
||||||
|
|
||||||
if not unique_feed_urls:
|
|
||||||
print(
|
|
||||||
"Error: No RSS feeds configured. Please set RSS_FEED_URL, RSS_FEEDS, or FEEDS_FILE."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(
|
bot = MastodonRSSBot(
|
||||||
client_id=os.environ["MASTODON_CLIENT_ID"],
|
client_id=config.client_id,
|
||||||
client_secret=os.environ["MASTODON_CLIENT_SECRET"],
|
client_secret=config.client_secret,
|
||||||
access_token=os.environ["MASTODON_ACCESS_TOKEN"],
|
access_token=config.access_token,
|
||||||
instance_url=os.environ["MASTODON_INSTANCE_URL"],
|
instance_url=config.instance_url,
|
||||||
feed_urls=unique_feed_urls,
|
feed_urls=config.feed_urls,
|
||||||
toot_visibility=os.environ.get("TOOT_VISIBILITY", "public"),
|
toot_visibility=config.toot_visibility,
|
||||||
check_interval=int(os.environ.get("CHECK_INTERVAL", "300")),
|
check_interval=config.check_interval,
|
||||||
state_file=os.environ.get(
|
state_file=config.state_file,
|
||||||
"PROCESSED_ENTRIES_FILE", "/state/processed_entries.txt"
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
print("Bot configured successfully:")
|
|
||||||
print(f" Instance: {os.environ['MASTODON_INSTANCE_URL']}")
|
|
||||||
print(f" Monitoring {len(unique_feed_urls)} feed(s):")
|
|
||||||
for url in unique_feed_urls:
|
|
||||||
print(f" - {url}")
|
|
||||||
print(f" Visibility: {os.environ.get('TOOT_VISIBILITY', 'public')}")
|
|
||||||
print(f" Check interval: {os.environ.get('CHECK_INTERVAL', '300')} seconds")
|
|
||||||
print(
|
|
||||||
f" State file: {os.environ.get('PROCESSED_ENTRIES_FILE', '/state/processed_entries.txt')}"
|
|
||||||
)
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Start the bot
|
# Start the bot
|
||||||
bot.run()
|
bot.run()
|
||||||
|
|
||||||
|
|||||||
350
test_bot.py
350
test_bot.py
@@ -4,6 +4,7 @@ import unittest
|
|||||||
from unittest.mock import Mock, patch, mock_open, MagicMock
|
from unittest.mock import Mock, patch, mock_open, MagicMock
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
from bot import MastodonRSSBot
|
from bot import MastodonRSSBot
|
||||||
import feedparser
|
import feedparser
|
||||||
|
|
||||||
@@ -37,7 +38,7 @@ class TestMastodonRSSBot(unittest.TestCase):
|
|||||||
self.assertEqual(bot.feed_urls, self.test_config["feed_urls"])
|
self.assertEqual(bot.feed_urls, self.test_config["feed_urls"])
|
||||||
self.assertEqual(bot.toot_visibility, self.test_config["toot_visibility"])
|
self.assertEqual(bot.toot_visibility, self.test_config["toot_visibility"])
|
||||||
self.assertEqual(bot.check_interval, self.test_config["check_interval"])
|
self.assertEqual(bot.check_interval, self.test_config["check_interval"])
|
||||||
self.assertEqual(bot.state_file, self.test_config["state_file"])
|
self.assertEqual(bot.state_file, Path(self.test_config["state_file"]))
|
||||||
|
|
||||||
# Verify Mastodon client was initialized correctly
|
# Verify Mastodon client was initialized correctly
|
||||||
mock_mastodon.assert_called_once_with(
|
mock_mastodon.assert_called_once_with(
|
||||||
@@ -48,299 +49,102 @@ class TestMastodonRSSBot(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@patch("bot.Mastodon")
|
@patch("bot.Mastodon")
|
||||||
def test_load_processed_entries_empty(self, mock_mastodon):
|
def test_save_processed_entries_error(self, mock_mastodon):
|
||||||
"""Test loading processed entries from non-existent file returns empty set"""
|
"""Test error handling when saving processed entries fails"""
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
bot = MastodonRSSBot(**self.test_config)
|
||||||
entries = bot.load_processed_entries()
|
|
||||||
|
# Mock Path.write_text to raise exception
|
||||||
self.assertEqual(entries, set())
|
with patch.object(Path, "write_text", side_effect=Exception("Disk full")):
|
||||||
self.assertIsInstance(entries, set)
|
# Should not raise exception
|
||||||
|
bot.save_processed_entries({"https://example.com/1"})
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_load_processed_entries_existing(self, mock_mastodon):
|
|
||||||
"""Test loading processed entries from existing file"""
|
|
||||||
# Create a temporary file with test data
|
|
||||||
test_urls = [
|
|
||||||
"https://example.com/1",
|
|
||||||
"https://example.com/2",
|
|
||||||
"https://example.com/3",
|
|
||||||
]
|
|
||||||
with open(self.test_config["state_file"], "w") as f:
|
|
||||||
f.write("\n".join(test_urls))
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
entries = bot.load_processed_entries()
|
|
||||||
|
|
||||||
self.assertEqual(entries, set(test_urls))
|
|
||||||
self.assertEqual(len(entries), 3)
|
|
||||||
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_save_processed_entries(self, mock_mastodon):
|
|
||||||
"""Test saving processed entries to file"""
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
test_entries = {
|
|
||||||
"https://example.com/1",
|
|
||||||
"https://example.com/2",
|
|
||||||
"https://example.com/3",
|
|
||||||
}
|
|
||||||
|
|
||||||
bot.save_processed_entries(test_entries)
|
|
||||||
|
|
||||||
# Verify file was created and contains correct data
|
|
||||||
self.assertTrue(os.path.exists(self.test_config["state_file"]))
|
|
||||||
|
|
||||||
with open(self.test_config["state_file"], "r") as f:
|
|
||||||
saved_entries = set(f.read().splitlines())
|
|
||||||
|
|
||||||
self.assertEqual(saved_entries, test_entries)
|
|
||||||
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_save_processed_entries_creates_directory(self, mock_mastodon):
|
|
||||||
"""Test that saving entries creates directory if it doesn't exist"""
|
|
||||||
# Use a path with a non-existent directory
|
|
||||||
test_dir = tempfile.mkdtemp()
|
|
||||||
nested_path = os.path.join(test_dir, "subdir", "state.txt")
|
|
||||||
self.test_config["state_file"] = nested_path
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
bot.save_processed_entries({"https://example.com/1"})
|
|
||||||
|
|
||||||
self.assertTrue(os.path.exists(nested_path))
|
|
||||||
|
|
||||||
# Cleanup
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
shutil.rmtree(test_dir)
|
|
||||||
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_format_status(self, mock_mastodon):
|
|
||||||
"""Test status formatting from feed entry"""
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
|
|
||||||
entry = {"title": "Test Article", "link": "https://example.com/article"}
|
|
||||||
|
|
||||||
status = bot.format_status(entry)
|
|
||||||
expected = "\nTest Article\n\nhttps://example.com/article"
|
|
||||||
|
|
||||||
self.assertEqual(status, expected)
|
|
||||||
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_format_status_missing_title(self, mock_mastodon):
|
|
||||||
"""Test status formatting with missing title"""
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
|
|
||||||
entry = {"link": "https://example.com/article"}
|
|
||||||
status = bot.format_status(entry)
|
|
||||||
|
|
||||||
self.assertIn("Untitled", status)
|
|
||||||
self.assertIn("https://example.com/article", status)
|
|
||||||
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_post_to_mastodon_success(self, mock_mastodon):
|
|
||||||
"""Test successful posting to Mastodon"""
|
|
||||||
mock_instance = Mock()
|
|
||||||
mock_mastodon.return_value = mock_instance
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
result = bot.post_to_mastodon("Test status")
|
|
||||||
|
|
||||||
self.assertTrue(result)
|
|
||||||
mock_instance.status_post.assert_called_once_with(
|
|
||||||
"Test status", visibility=self.test_config["toot_visibility"]
|
|
||||||
)
|
|
||||||
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_post_to_mastodon_failure(self, mock_mastodon):
|
|
||||||
"""Test handling of Mastodon posting failure"""
|
|
||||||
mock_instance = Mock()
|
|
||||||
mock_instance.status_post.side_effect = Exception("API Error")
|
|
||||||
mock_mastodon.return_value = mock_instance
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
result = bot.post_to_mastodon("Test status")
|
|
||||||
|
|
||||||
self.assertFalse(result)
|
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
@patch("bot.feedparser.parse")
|
||||||
@patch("bot.Mastodon")
|
@patch("bot.Mastodon")
|
||||||
def test_parse_feed_success(self, mock_mastodon, mock_parse):
|
def test_parse_feed_bozo(self, mock_mastodon, mock_parse):
|
||||||
"""Test successful feed parsing"""
|
"""Test feed parsing with bozo exception (warning)"""
|
||||||
mock_feed = Mock()
|
mock_feed = Mock()
|
||||||
mock_feed.entries = [{"title": "Test", "link": "https://example.com"}]
|
mock_feed.bozo_exception = Exception("XML Error")
|
||||||
mock_parse.return_value = mock_feed
|
mock_parse.return_value = mock_feed
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
bot = MastodonRSSBot(**self.test_config)
|
||||||
feed = bot.parse_feed("https://example.com/feed.xml")
|
feed = bot.parse_feed("https://example.com/feed.xml")
|
||||||
|
|
||||||
self.assertIsNotNone(feed)
|
self.assertIsNotNone(feed)
|
||||||
mock_parse.assert_called_once_with("https://example.com/feed.xml")
|
# We can't easily assert the log/print was called without mocking logging,
|
||||||
|
# but execution flow is covered.
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
|
||||||
@patch("bot.Mastodon")
|
@patch("bot.Mastodon")
|
||||||
def test_parse_feed_with_exception(self, mock_mastodon, mock_parse):
|
def test_run_keyboard_interrupt(self, mock_mastodon):
|
||||||
"""Test feed parsing with exception"""
|
"""Test clean exit on KeyboardInterrupt"""
|
||||||
mock_parse.side_effect = Exception("Network error")
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
bot = MastodonRSSBot(**self.test_config)
|
||||||
feed = bot.parse_feed("https://example.com/feed.xml")
|
|
||||||
|
# Mock process_new_entries to raise KeyboardInterrupt
|
||||||
|
bot.process_new_entries = Mock(side_effect=KeyboardInterrupt)
|
||||||
|
|
||||||
|
# Should exit cleanly
|
||||||
|
bot.run()
|
||||||
|
bot.process_new_entries.assert_called_once()
|
||||||
|
|
||||||
self.assertIsNone(feed)
|
@patch("bot.time.sleep")
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
|
||||||
@patch("bot.Mastodon")
|
@patch("bot.Mastodon")
|
||||||
def test_process_new_entries_no_entries(self, mock_mastodon, mock_parse):
|
def test_run_exception_retry(self, mock_mastodon, mock_sleep):
|
||||||
"""Test processing when feed has no entries"""
|
"""Test retry logic on exception in main loop"""
|
||||||
mock_feed = Mock()
|
|
||||||
mock_feed.entries = []
|
|
||||||
mock_parse.return_value = mock_feed
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
bot = MastodonRSSBot(**self.test_config)
|
||||||
count = bot.process_new_entries()
|
|
||||||
|
# Raise exception once, then KeyboardInterrupt to exit loop
|
||||||
self.assertEqual(count, 0)
|
bot.process_new_entries = Mock(side_effect=[Exception("Network Error"), KeyboardInterrupt])
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
bot.run()
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_process_new_entries_all_new(self, mock_mastodon, mock_parse):
|
self.assertEqual(bot.process_new_entries.call_count, 2)
|
||||||
"""Test processing with all new entries"""
|
mock_sleep.assert_called_with(bot.check_interval)
|
||||||
# Mock feed with 3 entries
|
|
||||||
mock_feed = Mock()
|
|
||||||
mock_feed.entries = [
|
|
||||||
{"title": "Article 1", "link": "https://example.com/1"},
|
|
||||||
{"title": "Article 2", "link": "https://example.com/2"},
|
|
||||||
{"title": "Article 3", "link": "https://example.com/3"},
|
|
||||||
]
|
|
||||||
mock_parse.return_value = mock_feed
|
|
||||||
|
|
||||||
# Mock Mastodon instance
|
|
||||||
mock_instance = Mock()
|
|
||||||
mock_mastodon.return_value = mock_instance
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
count = bot.process_new_entries()
|
|
||||||
|
|
||||||
self.assertEqual(count, 3)
|
|
||||||
self.assertEqual(mock_instance.status_post.call_count, 3)
|
|
||||||
|
|
||||||
# Verify entries were saved
|
|
||||||
saved_entries = bot.load_processed_entries()
|
|
||||||
self.assertEqual(len(saved_entries), 3)
|
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_process_new_entries_multiple_feeds(self, mock_mastodon, mock_parse):
|
|
||||||
"""Test processing with multiple feeds"""
|
|
||||||
self.test_config["feed_urls"] = ["http://feed1.com", "http://feed2.com"]
|
|
||||||
|
|
||||||
def side_effect(url):
|
|
||||||
mock = Mock()
|
|
||||||
if url == "http://feed1.com":
|
|
||||||
mock.entries = [{"title": "1", "link": "http://link1.com"}]
|
|
||||||
else:
|
|
||||||
mock.entries = [{"title": "2", "link": "http://link2.com"}]
|
|
||||||
return mock
|
|
||||||
|
|
||||||
mock_parse.side_effect = side_effect
|
|
||||||
|
|
||||||
mock_instance = Mock()
|
|
||||||
mock_mastodon.return_value = mock_instance
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
count = bot.process_new_entries()
|
|
||||||
|
|
||||||
self.assertEqual(count, 2)
|
|
||||||
self.assertEqual(mock_parse.call_count, 2)
|
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_process_new_entries_some_processed(self, mock_mastodon, mock_parse):
|
|
||||||
"""Test processing with some entries already processed"""
|
|
||||||
# Pre-populate processed entries
|
|
||||||
processed = {"https://example.com/1", "https://example.com/2"}
|
|
||||||
with open(self.test_config["state_file"], "w") as f:
|
|
||||||
f.write("\n".join(processed))
|
|
||||||
|
|
||||||
# Mock feed with 4 entries (2 old, 2 new)
|
|
||||||
mock_feed = Mock()
|
|
||||||
mock_feed.entries = [
|
|
||||||
{
|
|
||||||
"title": "Article 1",
|
|
||||||
"link": "https://example.com/1",
|
|
||||||
}, # Already processed
|
|
||||||
{
|
|
||||||
"title": "Article 2",
|
|
||||||
"link": "https://example.com/2",
|
|
||||||
}, # Already processed
|
|
||||||
{"title": "Article 3", "link": "https://example.com/3"}, # New
|
|
||||||
{"title": "Article 4", "link": "https://example.com/4"}, # New
|
|
||||||
]
|
|
||||||
mock_parse.return_value = mock_feed
|
|
||||||
|
|
||||||
# Mock Mastodon instance
|
|
||||||
mock_instance = Mock()
|
|
||||||
mock_mastodon.return_value = mock_instance
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
count = bot.process_new_entries()
|
|
||||||
|
|
||||||
# Should only post 2 new entries
|
|
||||||
self.assertEqual(count, 2)
|
|
||||||
self.assertEqual(mock_instance.status_post.call_count, 2)
|
|
||||||
|
|
||||||
# Verify all 4 entries are now in processed list
|
|
||||||
saved_entries = bot.load_processed_entries()
|
|
||||||
self.assertEqual(len(saved_entries), 4)
|
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_process_new_entries_skip_no_url(self, mock_mastodon, mock_parse):
|
|
||||||
"""Test that entries without URLs are skipped"""
|
|
||||||
mock_feed = Mock()
|
|
||||||
mock_feed.entries = [
|
|
||||||
{"title": "Article without URL"}, # No link field
|
|
||||||
{"title": "Article with URL", "link": "https://example.com/1"},
|
|
||||||
]
|
|
||||||
mock_parse.return_value = mock_feed
|
|
||||||
|
|
||||||
mock_instance = Mock()
|
|
||||||
mock_mastodon.return_value = mock_instance
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
count = bot.process_new_entries()
|
|
||||||
|
|
||||||
# Should only process 1 entry (the one with URL)
|
|
||||||
self.assertEqual(count, 1)
|
|
||||||
self.assertEqual(mock_instance.status_post.call_count, 1)
|
|
||||||
|
|
||||||
@patch("bot.feedparser.parse")
|
|
||||||
@patch("bot.Mastodon")
|
|
||||||
def test_process_new_entries_posting_failure(self, mock_mastodon, mock_parse):
|
|
||||||
"""Test that failed posts don't get marked as processed"""
|
|
||||||
mock_feed = Mock()
|
|
||||||
mock_feed.entries = [
|
|
||||||
{"title": "Article 1", "link": "https://example.com/1"},
|
|
||||||
]
|
|
||||||
mock_parse.return_value = mock_feed
|
|
||||||
|
|
||||||
# Mock Mastodon to fail
|
|
||||||
mock_instance = Mock()
|
|
||||||
mock_instance.status_post.side_effect = Exception("API Error")
|
|
||||||
mock_mastodon.return_value = mock_instance
|
|
||||||
|
|
||||||
bot = MastodonRSSBot(**self.test_config)
|
|
||||||
count = bot.process_new_entries()
|
|
||||||
|
|
||||||
# No entries should be counted as posted
|
|
||||||
self.assertEqual(count, 0)
|
|
||||||
|
|
||||||
# Entry should not be marked as processed
|
|
||||||
saved_entries = bot.load_processed_entries()
|
|
||||||
self.assertEqual(len(saved_entries), 0)
|
|
||||||
|
|
||||||
|
|
||||||
class TestMainEntry(unittest.TestCase):
|
class TestMainEntry(unittest.TestCase):
|
||||||
"""Test cases for main.py entry point"""
|
"""Test cases for main.py entry point"""
|
||||||
|
|
||||||
|
@patch.dict(os.environ, {}, clear=True)
|
||||||
|
def test_config_missing_vars(self):
|
||||||
|
"""Test Config raises ValueError when env vars are missing"""
|
||||||
|
from main import Config
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
Config.from_env()
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"MASTODON_CLIENT_ID": "id",
|
||||||
|
"MASTODON_CLIENT_SECRET": "secret",
|
||||||
|
"MASTODON_ACCESS_TOKEN": "token",
|
||||||
|
"MASTODON_INSTANCE_URL": "url",
|
||||||
|
# No feed urls
|
||||||
|
},
|
||||||
|
)
|
||||||
|
def test_config_no_feeds(self):
|
||||||
|
"""Test Config raises ValueError when no feeds are configured"""
|
||||||
|
from main import Config
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
Config.from_env()
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"MASTODON_CLIENT_ID": "id",
|
||||||
|
"MASTODON_CLIENT_SECRET": "secret",
|
||||||
|
"MASTODON_ACCESS_TOKEN": "token",
|
||||||
|
"MASTODON_INSTANCE_URL": "url",
|
||||||
|
"FEEDS_FILE": "nonexistent.txt",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
def test_config_feed_file_error(self):
|
||||||
|
"""Test Config handles missing/bad feeds file gracefully (logs warning but continues check)"""
|
||||||
|
from main import Config
|
||||||
|
# Should raise ValueError ultimately because no feeds are found,
|
||||||
|
# but cover the file reading path
|
||||||
|
with self.assertRaises(ValueError) as cm:
|
||||||
|
Config.from_env()
|
||||||
|
self.assertIn("No RSS feeds configured", str(cm.exception))
|
||||||
|
|
||||||
@patch.dict(
|
@patch.dict(
|
||||||
os.environ,
|
os.environ,
|
||||||
{
|
{
|
||||||
@@ -377,7 +181,7 @@ class TestMainEntry(unittest.TestCase):
|
|||||||
feed_urls=["https://example.com/feed.xml"],
|
feed_urls=["https://example.com/feed.xml"],
|
||||||
toot_visibility="unlisted",
|
toot_visibility="unlisted",
|
||||||
check_interval=120,
|
check_interval=120,
|
||||||
state_file="/tmp/test_state.txt",
|
state_file=Path("/tmp/test_state.txt"),
|
||||||
)
|
)
|
||||||
|
|
||||||
@patch.dict(
|
@patch.dict(
|
||||||
|
|||||||
Reference in New Issue
Block a user