|
|
@@ -0,0 +1,93 @@
|
|
|
+import os
|
|
|
+import time
|
|
|
+import json
|
|
|
+import redis
|
|
|
+import requests
|
|
|
+import logging
|
|
|
+
|
|
|
+# Configure logging
|
|
|
+logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ format='%(asctime)s [%(levelname)s] %(message)s',
|
|
|
+ handlers=[logging.StreamHandler()]
|
|
|
+)
|
|
|
+
|
|
|
+# Configuration from Environment
|
|
|
+BOT_TOKEN = os.getenv("BOT_TOKEN")
|
|
|
+REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
|
|
|
+REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
|
|
|
+QUEUE_NAME = "messages_queue"
|
|
|
+
|
|
|
+def send_telegram_message(chat_id, text):
|
|
|
+ if not BOT_TOKEN:
|
|
|
+ logging.error("BOT_TOKEN not found in environment variables")
|
|
|
+ return False
|
|
|
+
|
|
|
+ url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
|
|
|
+ payload = {
|
|
|
+ "chat_id": chat_id,
|
|
|
+ "text": text,
|
|
|
+ "parse_mode": "HTML"
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.post(url, json=payload, timeout=10)
|
|
|
+ response.raise_for_status()
|
|
|
+ logging.info(f"Successfully sent message to {chat_id}")
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(f"Failed to send message to {chat_id}: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+def main():
|
|
|
+ logging.info("Telegram Daemon starting...")
|
|
|
+
|
|
|
+ if not BOT_TOKEN:
|
|
|
+ logging.error("BOT_TOKEN is not set! Daemon will exit.")
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
|
|
|
+ # Test connection
|
|
|
+ r.ping()
|
|
|
+ logging.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(f"Could not connect to Redis: {e}")
|
|
|
+ return
|
|
|
+
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ # BLPOP blocks until a message is available
|
|
|
+ # It returns a tuple (queue_name, value)
|
|
|
+ item = r.blpop(QUEUE_NAME, timeout=10)
|
|
|
+
|
|
|
+ if item:
|
|
|
+ _, value = item
|
|
|
+ try:
|
|
|
+ data = json.loads(value)
|
|
|
+ chat_id = data.get("id")
|
|
|
+ message = data.get("message")
|
|
|
+
|
|
|
+ if chat_id and message:
|
|
|
+ send_telegram_message(chat_id, message)
|
|
|
+
|
|
|
+ # Anti-spam/Rate-limit logic
|
|
|
+ queue_len = r.llen(QUEUE_NAME)
|
|
|
+ if queue_len > 2:
|
|
|
+ logging.info(f"Queue size is {queue_len} (> 2). Sleeping for 30s...")
|
|
|
+ time.sleep(30)
|
|
|
+ else:
|
|
|
+ # Small pause to avoid CPU hogging
|
|
|
+ time.sleep(0.5)
|
|
|
+ else:
|
|
|
+ logging.warning(f"Invalid message format: {value}")
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ logging.error(f"Failed to decode message JSON: {value}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(f"Error in daemon loop: {e}")
|
|
|
+ time.sleep(5) # Wait before retry
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|