telegram_daemon.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import os
  2. import time
  3. import json
  4. import redis
  5. import requests
  6. import logging
  7. # Configure logging
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s [%(levelname)s] %(message)s',
  11. handlers=[logging.StreamHandler()]
  12. )
  13. # Configuration from Environment
  14. BOT_TOKEN = os.getenv("BOT_TOKEN")
  15. REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
  16. REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
  17. QUEUE_NAME = "messages_queue"
  18. def send_telegram_message(chat_id, text):
  19. if not BOT_TOKEN:
  20. logging.error("BOT_TOKEN not found in environment variables")
  21. return False
  22. url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
  23. payload = {
  24. "chat_id": chat_id,
  25. "text": text,
  26. "parse_mode": "HTML"
  27. }
  28. try:
  29. response = requests.post(url, json=payload, timeout=10)
  30. response.raise_for_status()
  31. logging.info(f"Successfully sent message to {chat_id}")
  32. return True
  33. except Exception as e:
  34. logging.error(f"Failed to send message to {chat_id}: {e}")
  35. return False
  36. def main():
  37. logging.info("Telegram Daemon starting...")
  38. if not BOT_TOKEN:
  39. logging.error("BOT_TOKEN is not set! Daemon will exit.")
  40. return
  41. try:
  42. r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
  43. # Test connection
  44. r.ping()
  45. logging.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
  46. except Exception as e:
  47. logging.error(f"Could not connect to Redis: {e}")
  48. return
  49. while True:
  50. try:
  51. # BLPOP blocks until a message is available
  52. # It returns a tuple (queue_name, value)
  53. item = r.blpop(QUEUE_NAME, timeout=10)
  54. if item:
  55. _, value = item
  56. try:
  57. data = json.loads(value)
  58. chat_id = data.get("id")
  59. message = data.get("message")
  60. if chat_id and message:
  61. send_telegram_message(chat_id, message)
  62. # Anti-spam/Rate-limit logic
  63. queue_len = r.llen(QUEUE_NAME)
  64. if queue_len > 2:
  65. logging.info(f"Queue size is {queue_len} (> 2). Sleeping for 30s...")
  66. time.sleep(30)
  67. else:
  68. # Small pause to avoid CPU hogging
  69. time.sleep(0.5)
  70. else:
  71. logging.warning(f"Invalid message format: {value}")
  72. except json.JSONDecodeError:
  73. logging.error(f"Failed to decode message JSON: {value}")
  74. except Exception as e:
  75. logging.error(f"Error in daemon loop: {e}")
  76. time.sleep(5) # Wait before retry
  77. if __name__ == "__main__":
  78. main()