mirror of
https://github.com/aserper/masto-rss.git
synced 2025-12-17 05:15:25 +00:00
Merge pull request #6 from aserper/modernization
Modernize codebase: Use pathlib, logging, dataclasses, and update dep…
This commit is contained in:
@@ -9,6 +9,7 @@
|
||||
[](LICENSE)
|
||||
[](https://www.python.org/downloads/)
|
||||
[](https://github.com/aserper/masto-rss)
|
||||
[](https://github.com/aserper/masto-rss/network)
|
||||
|
||||
A simple, lightweight Mastodon bot that automatically posts updates from RSS feeds to the Fediverse. Built with Python and designed to run seamlessly in Docker with multiarch support (amd64 & arm64).
|
||||
|
||||
|
||||
66
bot.py
66
bot.py
@@ -1,10 +1,16 @@
|
||||
"""Mastodon RSS Bot - Core functionality"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Set
|
||||
|
||||
import feedparser
|
||||
from mastodon import Mastodon
|
||||
import os
|
||||
import time
|
||||
from typing import Set, Optional
|
||||
|
||||
# Configure logging for this module
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MastodonRSSBot:
|
||||
@@ -16,10 +22,10 @@ class MastodonRSSBot:
|
||||
client_secret: str,
|
||||
access_token: str,
|
||||
instance_url: str,
|
||||
feed_urls: list[str],
|
||||
feed_urls: List[str],
|
||||
toot_visibility: str = "public",
|
||||
check_interval: int = 300,
|
||||
state_file: str = "/state/processed_entries.txt",
|
||||
state_file: Path = Path("/state/processed_entries.txt"),
|
||||
):
|
||||
"""
|
||||
Initialize the Mastodon RSS bot.
|
||||
@@ -37,7 +43,7 @@ class MastodonRSSBot:
|
||||
self.feed_urls = feed_urls
|
||||
self.toot_visibility = toot_visibility
|
||||
self.check_interval = check_interval
|
||||
self.state_file = state_file
|
||||
self.state_file = Path(state_file)
|
||||
|
||||
# Initialize Mastodon client
|
||||
self.mastodon = Mastodon(
|
||||
@@ -54,10 +60,13 @@ class MastodonRSSBot:
|
||||
Returns:
|
||||
Set of URLs that have been processed
|
||||
"""
|
||||
if not self.state_file.exists():
|
||||
return set()
|
||||
|
||||
try:
|
||||
with open(self.state_file, "r") as file:
|
||||
return set(file.read().splitlines())
|
||||
except FileNotFoundError:
|
||||
return set(self.state_file.read_text().splitlines())
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading processed entries from {self.state_file}: {e}")
|
||||
return set()
|
||||
|
||||
def save_processed_entries(self, processed_entries: Set[str]) -> None:
|
||||
@@ -68,10 +77,11 @@ class MastodonRSSBot:
|
||||
processed_entries: Set of processed entry URLs
|
||||
"""
|
||||
# Ensure directory exists
|
||||
os.makedirs(os.path.dirname(self.state_file), exist_ok=True)
|
||||
|
||||
with open(self.state_file, "w") as file:
|
||||
file.write("\n".join(sorted(processed_entries)))
|
||||
try:
|
||||
self.state_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.state_file.write_text("\n".join(sorted(processed_entries)))
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving processed entries to {self.state_file}: {e}")
|
||||
|
||||
def parse_feed(self, feed_url: str) -> Optional[feedparser.FeedParserDict]:
|
||||
"""
|
||||
@@ -86,12 +96,12 @@ class MastodonRSSBot:
|
||||
try:
|
||||
feed = feedparser.parse(feed_url)
|
||||
if hasattr(feed, "bozo_exception"):
|
||||
print(
|
||||
f"Warning: Feed parsing issue for {feed_url}: {feed.bozo_exception}"
|
||||
logger.warning(
|
||||
f"Feed parsing issue for {feed_url}: {feed.bozo_exception}"
|
||||
)
|
||||
return feed
|
||||
except Exception as e:
|
||||
print(f"Error parsing feed {feed_url}: {e}")
|
||||
logger.error(f"Error parsing feed {feed_url}: {e}")
|
||||
return None
|
||||
|
||||
def format_status(self, entry: feedparser.FeedParserDict) -> str:
|
||||
@@ -122,7 +132,7 @@ class MastodonRSSBot:
|
||||
self.mastodon.status_post(status, visibility=self.toot_visibility)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error posting to Mastodon: {e}")
|
||||
logger.error(f"Error posting to Mastodon: {e}")
|
||||
return False
|
||||
|
||||
def process_feed(self, feed_url: str, processed_entries: Set[str]) -> int:
|
||||
@@ -136,10 +146,10 @@ class MastodonRSSBot:
|
||||
Returns:
|
||||
Number of new entries posted
|
||||
"""
|
||||
print(f"Checking feed: {feed_url}")
|
||||
logger.info(f"Checking feed: {feed_url}")
|
||||
feed = self.parse_feed(feed_url)
|
||||
if not feed or not hasattr(feed, "entries"):
|
||||
print(f"No entries found in feed: {feed_url}")
|
||||
logger.warning(f"No entries found in feed: {feed_url}")
|
||||
return 0
|
||||
|
||||
new_entries_count = 0
|
||||
@@ -149,13 +159,13 @@ class MastodonRSSBot:
|
||||
entry_url = entry.get("link", "")
|
||||
|
||||
if not entry_url:
|
||||
print("Skipping entry without URL")
|
||||
logger.debug("Skipping entry without URL")
|
||||
continue
|
||||
|
||||
# Check if entry is new
|
||||
if entry_url not in processed_entries:
|
||||
title = entry.get("title", "Untitled")
|
||||
print(f"Found a new RSS item: {title}")
|
||||
logger.info(f"Found a new RSS item: {title}")
|
||||
|
||||
# Format and post status
|
||||
status = self.format_status(entry)
|
||||
@@ -163,7 +173,7 @@ class MastodonRSSBot:
|
||||
processed_entries.add(entry_url)
|
||||
new_entries_count += 1
|
||||
else:
|
||||
print(f"Failed to post entry: {title}")
|
||||
logger.error(f"Failed to post entry: {title}")
|
||||
|
||||
return new_entries_count
|
||||
|
||||
@@ -174,7 +184,7 @@ class MastodonRSSBot:
|
||||
Returns:
|
||||
Total number of new entries posted across all feeds
|
||||
"""
|
||||
print("Checking for new RSS items...")
|
||||
logger.info("Checking for new RSS items...")
|
||||
|
||||
# Load processed entries
|
||||
processed_entries = self.load_processed_entries()
|
||||
@@ -198,15 +208,15 @@ class MastodonRSSBot:
|
||||
try:
|
||||
count = self.process_new_entries()
|
||||
if count > 0:
|
||||
print(f"Posted {count} new entries")
|
||||
logger.info(f"Posted {count} new entries")
|
||||
|
||||
print(f"Sleeping for {self.check_interval} seconds...")
|
||||
logger.info(f"Sleeping for {self.check_interval} seconds...")
|
||||
time.sleep(self.check_interval)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nBot stopped by user")
|
||||
logger.info("Bot stopped by user")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error in main loop: {e}")
|
||||
print(f"Retrying in {self.check_interval} seconds...")
|
||||
logger.error(f"Error in main loop: {e}", exc_info=True)
|
||||
logger.info(f"Retrying in {self.check_interval} seconds...")
|
||||
time.sleep(self.check_interval)
|
||||
|
||||
185
main.py
185
main.py
@@ -1,80 +1,135 @@
|
||||
"""Mastodon RSS Bot - Entry point"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from bot import MastodonRSSBot
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
handlers=[logging.StreamHandler(sys.stdout)]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Configuration loaded from environment variables."""
|
||||
instance_url: str
|
||||
client_id: str
|
||||
client_secret: str
|
||||
access_token: str
|
||||
feed_urls: List[str] = field(default_factory=list)
|
||||
toot_visibility: str = "public"
|
||||
check_interval: int = 300
|
||||
state_file: Path = field(default_factory=lambda: Path("/state/processed_entries.txt"))
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> "Config":
|
||||
"""Load configuration from environment variables."""
|
||||
instance_url = os.environ.get("MASTODON_INSTANCE_URL")
|
||||
client_id = os.environ.get("MASTODON_CLIENT_ID")
|
||||
client_secret = os.environ.get("MASTODON_CLIENT_SECRET")
|
||||
access_token = os.environ.get("MASTODON_ACCESS_TOKEN")
|
||||
|
||||
if not all([instance_url, client_id, client_secret, access_token]):
|
||||
missing = [
|
||||
k for k, v in {
|
||||
"MASTODON_INSTANCE_URL": instance_url,
|
||||
"MASTODON_CLIENT_ID": client_id,
|
||||
"MASTODON_CLIENT_SECRET": client_secret,
|
||||
"MASTODON_ACCESS_TOKEN": access_token
|
||||
}.items() if not v
|
||||
]
|
||||
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
|
||||
|
||||
# Parse feeds
|
||||
feed_urls = []
|
||||
|
||||
# 1. Legacy single feed URL
|
||||
if os.environ.get("RSS_FEED_URL"):
|
||||
feed_urls.append(os.environ["RSS_FEED_URL"])
|
||||
|
||||
# 2. Comma-separated list of feeds
|
||||
if os.environ.get("RSS_FEEDS"):
|
||||
feeds = [
|
||||
url.strip() for url in os.environ["RSS_FEEDS"].split(",") if url.strip()
|
||||
]
|
||||
feed_urls.extend(feeds)
|
||||
|
||||
# 3. File containing list of feeds
|
||||
feeds_file = os.environ.get("FEEDS_FILE")
|
||||
if feeds_file:
|
||||
path = Path(feeds_file)
|
||||
if path.exists():
|
||||
try:
|
||||
content = path.read_text().splitlines()
|
||||
file_feeds = [
|
||||
line.strip()
|
||||
for line in content
|
||||
if line.strip() and not line.startswith("#")
|
||||
]
|
||||
feed_urls.extend(file_feeds)
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading feeds file {feeds_file}: {e}")
|
||||
else:
|
||||
logger.warning(f"Feeds file configured but not found: {feeds_file}")
|
||||
|
||||
# Deduplicate while preserving order
|
||||
unique_feed_urls = list(dict.fromkeys(feed_urls))
|
||||
|
||||
if not unique_feed_urls:
|
||||
raise ValueError("No RSS feeds configured. Please set RSS_FEED_URL, RSS_FEEDS, or FEEDS_FILE.")
|
||||
|
||||
return cls(
|
||||
instance_url=instance_url, # type: ignore # checked above
|
||||
client_id=client_id, # type: ignore
|
||||
client_secret=client_secret,# type: ignore
|
||||
access_token=access_token, # type: ignore
|
||||
feed_urls=unique_feed_urls,
|
||||
toot_visibility=os.environ.get("TOOT_VISIBILITY", "public"),
|
||||
check_interval=int(os.environ.get("CHECK_INTERVAL", "300")),
|
||||
state_file=Path(os.environ.get("PROCESSED_ENTRIES_FILE", "/state/processed_entries.txt"))
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
"""Initialize and run the bot with environment configuration"""
|
||||
print("Starting Mastodon RSS Bot...")
|
||||
logger.info("Starting Mastodon RSS Bot...")
|
||||
|
||||
# Load configuration from environment variables
|
||||
feed_urls = []
|
||||
try:
|
||||
config = Config.from_env()
|
||||
except ValueError as e:
|
||||
logger.critical(str(e))
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger.critical(f"Failed to load configuration: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# 1. Legacy single feed URL
|
||||
if os.environ.get("RSS_FEED_URL"):
|
||||
feed_urls.append(os.environ["RSS_FEED_URL"])
|
||||
|
||||
# 2. Comma-separated list of feeds
|
||||
if os.environ.get("RSS_FEEDS"):
|
||||
feeds = [
|
||||
url.strip() for url in os.environ["RSS_FEEDS"].split(",") if url.strip()
|
||||
]
|
||||
feed_urls.extend(feeds)
|
||||
|
||||
# 3. File containing list of feeds
|
||||
feeds_file = os.environ.get("FEEDS_FILE")
|
||||
if feeds_file and os.path.exists(feeds_file):
|
||||
try:
|
||||
with open(feeds_file, "r") as f:
|
||||
file_feeds = [
|
||||
line.strip()
|
||||
for line in f
|
||||
if line.strip() and not line.startswith("#")
|
||||
]
|
||||
feed_urls.extend(file_feeds)
|
||||
except Exception as e:
|
||||
print(f"Error reading feeds file {feeds_file}: {e}")
|
||||
|
||||
# Deduplicate while preserving order
|
||||
unique_feed_urls = []
|
||||
seen = set()
|
||||
for url in feed_urls:
|
||||
if url not in seen:
|
||||
unique_feed_urls.append(url)
|
||||
seen.add(url)
|
||||
|
||||
if not unique_feed_urls:
|
||||
print(
|
||||
"Error: No RSS feeds configured. Please set RSS_FEED_URL, RSS_FEEDS, or FEEDS_FILE."
|
||||
)
|
||||
return
|
||||
logger.info("Bot configured successfully:")
|
||||
logger.info(f" Instance: {config.instance_url}")
|
||||
logger.info(f" Monitoring {len(config.feed_urls)} feed(s):")
|
||||
for url in config.feed_urls:
|
||||
logger.info(f" - {url}")
|
||||
logger.info(f" Visibility: {config.toot_visibility}")
|
||||
logger.info(f" Check interval: {config.check_interval} seconds")
|
||||
logger.info(f" State file: {config.state_file}")
|
||||
|
||||
bot = MastodonRSSBot(
|
||||
client_id=os.environ["MASTODON_CLIENT_ID"],
|
||||
client_secret=os.environ["MASTODON_CLIENT_SECRET"],
|
||||
access_token=os.environ["MASTODON_ACCESS_TOKEN"],
|
||||
instance_url=os.environ["MASTODON_INSTANCE_URL"],
|
||||
feed_urls=unique_feed_urls,
|
||||
toot_visibility=os.environ.get("TOOT_VISIBILITY", "public"),
|
||||
check_interval=int(os.environ.get("CHECK_INTERVAL", "300")),
|
||||
state_file=os.environ.get(
|
||||
"PROCESSED_ENTRIES_FILE", "/state/processed_entries.txt"
|
||||
),
|
||||
client_id=config.client_id,
|
||||
client_secret=config.client_secret,
|
||||
access_token=config.access_token,
|
||||
instance_url=config.instance_url,
|
||||
feed_urls=config.feed_urls,
|
||||
toot_visibility=config.toot_visibility,
|
||||
check_interval=config.check_interval,
|
||||
state_file=config.state_file,
|
||||
)
|
||||
|
||||
print("Bot configured successfully:")
|
||||
print(f" Instance: {os.environ['MASTODON_INSTANCE_URL']}")
|
||||
print(f" Monitoring {len(unique_feed_urls)} feed(s):")
|
||||
for url in unique_feed_urls:
|
||||
print(f" - {url}")
|
||||
print(f" Visibility: {os.environ.get('TOOT_VISIBILITY', 'public')}")
|
||||
print(f" Check interval: {os.environ.get('CHECK_INTERVAL', '300')} seconds")
|
||||
print(
|
||||
f" State file: {os.environ.get('PROCESSED_ENTRIES_FILE', '/state/processed_entries.txt')}"
|
||||
)
|
||||
print()
|
||||
|
||||
# Start the bot
|
||||
bot.run()
|
||||
|
||||
|
||||
350
test_bot.py
350
test_bot.py
@@ -4,6 +4,7 @@ import unittest
|
||||
from unittest.mock import Mock, patch, mock_open, MagicMock
|
||||
import tempfile
|
||||
import os
|
||||
from pathlib import Path
|
||||
from bot import MastodonRSSBot
|
||||
import feedparser
|
||||
|
||||
@@ -37,7 +38,7 @@ class TestMastodonRSSBot(unittest.TestCase):
|
||||
self.assertEqual(bot.feed_urls, self.test_config["feed_urls"])
|
||||
self.assertEqual(bot.toot_visibility, self.test_config["toot_visibility"])
|
||||
self.assertEqual(bot.check_interval, self.test_config["check_interval"])
|
||||
self.assertEqual(bot.state_file, self.test_config["state_file"])
|
||||
self.assertEqual(bot.state_file, Path(self.test_config["state_file"]))
|
||||
|
||||
# Verify Mastodon client was initialized correctly
|
||||
mock_mastodon.assert_called_once_with(
|
||||
@@ -48,299 +49,102 @@ class TestMastodonRSSBot(unittest.TestCase):
|
||||
)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_load_processed_entries_empty(self, mock_mastodon):
|
||||
"""Test loading processed entries from non-existent file returns empty set"""
|
||||
def test_save_processed_entries_error(self, mock_mastodon):
|
||||
"""Test error handling when saving processed entries fails"""
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
entries = bot.load_processed_entries()
|
||||
|
||||
self.assertEqual(entries, set())
|
||||
self.assertIsInstance(entries, set)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_load_processed_entries_existing(self, mock_mastodon):
|
||||
"""Test loading processed entries from existing file"""
|
||||
# Create a temporary file with test data
|
||||
test_urls = [
|
||||
"https://example.com/1",
|
||||
"https://example.com/2",
|
||||
"https://example.com/3",
|
||||
]
|
||||
with open(self.test_config["state_file"], "w") as f:
|
||||
f.write("\n".join(test_urls))
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
entries = bot.load_processed_entries()
|
||||
|
||||
self.assertEqual(entries, set(test_urls))
|
||||
self.assertEqual(len(entries), 3)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_save_processed_entries(self, mock_mastodon):
|
||||
"""Test saving processed entries to file"""
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
test_entries = {
|
||||
"https://example.com/1",
|
||||
"https://example.com/2",
|
||||
"https://example.com/3",
|
||||
}
|
||||
|
||||
bot.save_processed_entries(test_entries)
|
||||
|
||||
# Verify file was created and contains correct data
|
||||
self.assertTrue(os.path.exists(self.test_config["state_file"]))
|
||||
|
||||
with open(self.test_config["state_file"], "r") as f:
|
||||
saved_entries = set(f.read().splitlines())
|
||||
|
||||
self.assertEqual(saved_entries, test_entries)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_save_processed_entries_creates_directory(self, mock_mastodon):
|
||||
"""Test that saving entries creates directory if it doesn't exist"""
|
||||
# Use a path with a non-existent directory
|
||||
test_dir = tempfile.mkdtemp()
|
||||
nested_path = os.path.join(test_dir, "subdir", "state.txt")
|
||||
self.test_config["state_file"] = nested_path
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
bot.save_processed_entries({"https://example.com/1"})
|
||||
|
||||
self.assertTrue(os.path.exists(nested_path))
|
||||
|
||||
# Cleanup
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(test_dir)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_format_status(self, mock_mastodon):
|
||||
"""Test status formatting from feed entry"""
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
|
||||
entry = {"title": "Test Article", "link": "https://example.com/article"}
|
||||
|
||||
status = bot.format_status(entry)
|
||||
expected = "\nTest Article\n\nhttps://example.com/article"
|
||||
|
||||
self.assertEqual(status, expected)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_format_status_missing_title(self, mock_mastodon):
|
||||
"""Test status formatting with missing title"""
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
|
||||
entry = {"link": "https://example.com/article"}
|
||||
status = bot.format_status(entry)
|
||||
|
||||
self.assertIn("Untitled", status)
|
||||
self.assertIn("https://example.com/article", status)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_post_to_mastodon_success(self, mock_mastodon):
|
||||
"""Test successful posting to Mastodon"""
|
||||
mock_instance = Mock()
|
||||
mock_mastodon.return_value = mock_instance
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
result = bot.post_to_mastodon("Test status")
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_instance.status_post.assert_called_once_with(
|
||||
"Test status", visibility=self.test_config["toot_visibility"]
|
||||
)
|
||||
|
||||
@patch("bot.Mastodon")
|
||||
def test_post_to_mastodon_failure(self, mock_mastodon):
|
||||
"""Test handling of Mastodon posting failure"""
|
||||
mock_instance = Mock()
|
||||
mock_instance.status_post.side_effect = Exception("API Error")
|
||||
mock_mastodon.return_value = mock_instance
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
result = bot.post_to_mastodon("Test status")
|
||||
|
||||
self.assertFalse(result)
|
||||
|
||||
# Mock Path.write_text to raise exception
|
||||
with patch.object(Path, "write_text", side_effect=Exception("Disk full")):
|
||||
# Should not raise exception
|
||||
bot.save_processed_entries({"https://example.com/1"})
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.Mastodon")
|
||||
def test_parse_feed_success(self, mock_mastodon, mock_parse):
|
||||
"""Test successful feed parsing"""
|
||||
def test_parse_feed_bozo(self, mock_mastodon, mock_parse):
|
||||
"""Test feed parsing with bozo exception (warning)"""
|
||||
mock_feed = Mock()
|
||||
mock_feed.entries = [{"title": "Test", "link": "https://example.com"}]
|
||||
mock_feed.bozo_exception = Exception("XML Error")
|
||||
mock_parse.return_value = mock_feed
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
feed = bot.parse_feed("https://example.com/feed.xml")
|
||||
|
||||
self.assertIsNotNone(feed)
|
||||
mock_parse.assert_called_once_with("https://example.com/feed.xml")
|
||||
# We can't easily assert the log/print was called without mocking logging,
|
||||
# but execution flow is covered.
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.Mastodon")
|
||||
def test_parse_feed_with_exception(self, mock_mastodon, mock_parse):
|
||||
"""Test feed parsing with exception"""
|
||||
mock_parse.side_effect = Exception("Network error")
|
||||
|
||||
def test_run_keyboard_interrupt(self, mock_mastodon):
|
||||
"""Test clean exit on KeyboardInterrupt"""
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
feed = bot.parse_feed("https://example.com/feed.xml")
|
||||
|
||||
# Mock process_new_entries to raise KeyboardInterrupt
|
||||
bot.process_new_entries = Mock(side_effect=KeyboardInterrupt)
|
||||
|
||||
# Should exit cleanly
|
||||
bot.run()
|
||||
bot.process_new_entries.assert_called_once()
|
||||
|
||||
self.assertIsNone(feed)
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.time.sleep")
|
||||
@patch("bot.Mastodon")
|
||||
def test_process_new_entries_no_entries(self, mock_mastodon, mock_parse):
|
||||
"""Test processing when feed has no entries"""
|
||||
mock_feed = Mock()
|
||||
mock_feed.entries = []
|
||||
mock_parse.return_value = mock_feed
|
||||
|
||||
def test_run_exception_retry(self, mock_mastodon, mock_sleep):
|
||||
"""Test retry logic on exception in main loop"""
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
count = bot.process_new_entries()
|
||||
|
||||
self.assertEqual(count, 0)
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.Mastodon")
|
||||
def test_process_new_entries_all_new(self, mock_mastodon, mock_parse):
|
||||
"""Test processing with all new entries"""
|
||||
# Mock feed with 3 entries
|
||||
mock_feed = Mock()
|
||||
mock_feed.entries = [
|
||||
{"title": "Article 1", "link": "https://example.com/1"},
|
||||
{"title": "Article 2", "link": "https://example.com/2"},
|
||||
{"title": "Article 3", "link": "https://example.com/3"},
|
||||
]
|
||||
mock_parse.return_value = mock_feed
|
||||
|
||||
# Mock Mastodon instance
|
||||
mock_instance = Mock()
|
||||
mock_mastodon.return_value = mock_instance
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
count = bot.process_new_entries()
|
||||
|
||||
self.assertEqual(count, 3)
|
||||
self.assertEqual(mock_instance.status_post.call_count, 3)
|
||||
|
||||
# Verify entries were saved
|
||||
saved_entries = bot.load_processed_entries()
|
||||
self.assertEqual(len(saved_entries), 3)
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.Mastodon")
|
||||
def test_process_new_entries_multiple_feeds(self, mock_mastodon, mock_parse):
|
||||
"""Test processing with multiple feeds"""
|
||||
self.test_config["feed_urls"] = ["http://feed1.com", "http://feed2.com"]
|
||||
|
||||
def side_effect(url):
|
||||
mock = Mock()
|
||||
if url == "http://feed1.com":
|
||||
mock.entries = [{"title": "1", "link": "http://link1.com"}]
|
||||
else:
|
||||
mock.entries = [{"title": "2", "link": "http://link2.com"}]
|
||||
return mock
|
||||
|
||||
mock_parse.side_effect = side_effect
|
||||
|
||||
mock_instance = Mock()
|
||||
mock_mastodon.return_value = mock_instance
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
count = bot.process_new_entries()
|
||||
|
||||
self.assertEqual(count, 2)
|
||||
self.assertEqual(mock_parse.call_count, 2)
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.Mastodon")
|
||||
def test_process_new_entries_some_processed(self, mock_mastodon, mock_parse):
|
||||
"""Test processing with some entries already processed"""
|
||||
# Pre-populate processed entries
|
||||
processed = {"https://example.com/1", "https://example.com/2"}
|
||||
with open(self.test_config["state_file"], "w") as f:
|
||||
f.write("\n".join(processed))
|
||||
|
||||
# Mock feed with 4 entries (2 old, 2 new)
|
||||
mock_feed = Mock()
|
||||
mock_feed.entries = [
|
||||
{
|
||||
"title": "Article 1",
|
||||
"link": "https://example.com/1",
|
||||
}, # Already processed
|
||||
{
|
||||
"title": "Article 2",
|
||||
"link": "https://example.com/2",
|
||||
}, # Already processed
|
||||
{"title": "Article 3", "link": "https://example.com/3"}, # New
|
||||
{"title": "Article 4", "link": "https://example.com/4"}, # New
|
||||
]
|
||||
mock_parse.return_value = mock_feed
|
||||
|
||||
# Mock Mastodon instance
|
||||
mock_instance = Mock()
|
||||
mock_mastodon.return_value = mock_instance
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
count = bot.process_new_entries()
|
||||
|
||||
# Should only post 2 new entries
|
||||
self.assertEqual(count, 2)
|
||||
self.assertEqual(mock_instance.status_post.call_count, 2)
|
||||
|
||||
# Verify all 4 entries are now in processed list
|
||||
saved_entries = bot.load_processed_entries()
|
||||
self.assertEqual(len(saved_entries), 4)
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.Mastodon")
|
||||
def test_process_new_entries_skip_no_url(self, mock_mastodon, mock_parse):
|
||||
"""Test that entries without URLs are skipped"""
|
||||
mock_feed = Mock()
|
||||
mock_feed.entries = [
|
||||
{"title": "Article without URL"}, # No link field
|
||||
{"title": "Article with URL", "link": "https://example.com/1"},
|
||||
]
|
||||
mock_parse.return_value = mock_feed
|
||||
|
||||
mock_instance = Mock()
|
||||
mock_mastodon.return_value = mock_instance
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
count = bot.process_new_entries()
|
||||
|
||||
# Should only process 1 entry (the one with URL)
|
||||
self.assertEqual(count, 1)
|
||||
self.assertEqual(mock_instance.status_post.call_count, 1)
|
||||
|
||||
@patch("bot.feedparser.parse")
|
||||
@patch("bot.Mastodon")
|
||||
def test_process_new_entries_posting_failure(self, mock_mastodon, mock_parse):
|
||||
"""Test that failed posts don't get marked as processed"""
|
||||
mock_feed = Mock()
|
||||
mock_feed.entries = [
|
||||
{"title": "Article 1", "link": "https://example.com/1"},
|
||||
]
|
||||
mock_parse.return_value = mock_feed
|
||||
|
||||
# Mock Mastodon to fail
|
||||
mock_instance = Mock()
|
||||
mock_instance.status_post.side_effect = Exception("API Error")
|
||||
mock_mastodon.return_value = mock_instance
|
||||
|
||||
bot = MastodonRSSBot(**self.test_config)
|
||||
count = bot.process_new_entries()
|
||||
|
||||
# No entries should be counted as posted
|
||||
self.assertEqual(count, 0)
|
||||
|
||||
# Entry should not be marked as processed
|
||||
saved_entries = bot.load_processed_entries()
|
||||
self.assertEqual(len(saved_entries), 0)
|
||||
|
||||
# Raise exception once, then KeyboardInterrupt to exit loop
|
||||
bot.process_new_entries = Mock(side_effect=[Exception("Network Error"), KeyboardInterrupt])
|
||||
|
||||
bot.run()
|
||||
|
||||
self.assertEqual(bot.process_new_entries.call_count, 2)
|
||||
mock_sleep.assert_called_with(bot.check_interval)
|
||||
|
||||
|
||||
class TestMainEntry(unittest.TestCase):
|
||||
"""Test cases for main.py entry point"""
|
||||
|
||||
@patch.dict(os.environ, {}, clear=True)
|
||||
def test_config_missing_vars(self):
|
||||
"""Test Config raises ValueError when env vars are missing"""
|
||||
from main import Config
|
||||
with self.assertRaises(ValueError):
|
||||
Config.from_env()
|
||||
|
||||
@patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"MASTODON_CLIENT_ID": "id",
|
||||
"MASTODON_CLIENT_SECRET": "secret",
|
||||
"MASTODON_ACCESS_TOKEN": "token",
|
||||
"MASTODON_INSTANCE_URL": "url",
|
||||
# No feed urls
|
||||
},
|
||||
)
|
||||
def test_config_no_feeds(self):
|
||||
"""Test Config raises ValueError when no feeds are configured"""
|
||||
from main import Config
|
||||
with self.assertRaises(ValueError):
|
||||
Config.from_env()
|
||||
|
||||
@patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"MASTODON_CLIENT_ID": "id",
|
||||
"MASTODON_CLIENT_SECRET": "secret",
|
||||
"MASTODON_ACCESS_TOKEN": "token",
|
||||
"MASTODON_INSTANCE_URL": "url",
|
||||
"FEEDS_FILE": "nonexistent.txt",
|
||||
},
|
||||
)
|
||||
def test_config_feed_file_error(self):
|
||||
"""Test Config handles missing/bad feeds file gracefully (logs warning but continues check)"""
|
||||
from main import Config
|
||||
# Should raise ValueError ultimately because no feeds are found,
|
||||
# but cover the file reading path
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
Config.from_env()
|
||||
self.assertIn("No RSS feeds configured", str(cm.exception))
|
||||
|
||||
@patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
@@ -377,7 +181,7 @@ class TestMainEntry(unittest.TestCase):
|
||||
feed_urls=["https://example.com/feed.xml"],
|
||||
toot_visibility="unlisted",
|
||||
check_interval=120,
|
||||
state_file="/tmp/test_state.txt",
|
||||
state_file=Path("/tmp/test_state.txt"),
|
||||
)
|
||||
|
||||
@patch.dict(
|
||||
|
||||
Reference in New Issue
Block a user