| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import os
- import time
- import json
- import redis
- import requests
- import logging
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s [%(levelname)s] %(message)s',
- handlers=[logging.StreamHandler()]
- )
- # Configuration from Environment
- BOT_TOKEN = os.getenv("BOT_TOKEN")
- REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
- REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
- QUEUE_NAME = "messages_queue"
- def send_telegram_message(chat_id, text):
- if not BOT_TOKEN:
- logging.error("BOT_TOKEN not found in environment variables")
- return False
-
- url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
- payload = {
- "chat_id": chat_id,
- "text": text,
- "parse_mode": "HTML"
- }
-
- try:
- response = requests.post(url, json=payload, timeout=10)
- response.raise_for_status()
- logging.info(f"Successfully sent message to {chat_id}")
- return True
- except Exception as e:
- logging.error(f"Failed to send message to {chat_id}: {e}")
- return False
- def main():
- logging.info("Telegram Daemon starting...")
-
- if not BOT_TOKEN:
- logging.error("BOT_TOKEN is not set! Daemon will exit.")
- return
- try:
- r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
- # Test connection
- r.ping()
- logging.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
- except Exception as e:
- logging.error(f"Could not connect to Redis: {e}")
- return
- while True:
- try:
- # BLPOP blocks until a message is available
- # It returns a tuple (queue_name, value)
- item = r.blpop(QUEUE_NAME, timeout=10)
-
- if item:
- _, value = item
- try:
- data = json.loads(value)
- chat_id = data.get("id")
- message = data.get("message")
-
- if chat_id and message:
- send_telegram_message(chat_id, message)
-
- # Anti-spam/Rate-limit logic
- queue_len = r.llen(QUEUE_NAME)
- if queue_len > 2:
- logging.info(f"Queue size is {queue_len} (> 2). Sleeping for 30s...")
- time.sleep(30)
- else:
- # Small pause to avoid CPU hogging
- time.sleep(0.5)
- else:
- logging.warning(f"Invalid message format: {value}")
-
- except json.JSONDecodeError:
- logging.error(f"Failed to decode message JSON: {value}")
-
- except Exception as e:
- logging.error(f"Error in daemon loop: {e}")
- time.sleep(5) # Wait before retry
- if __name__ == "__main__":
- main()
|