Интеграция Firecrawl с Zapier для no-code автоматизации и workflow
import requests
import time
import schedule
# Конфигурация
FIRECRAWL_API_KEY = "your_firecrawl_api_key"
ZAPIER_WEBHOOK_URL = "your_zapier_webhook_url"
FIRECRAWL_BASE_URL = "https://api.firecrawl.ru/v1"
def monitor_competitor_prices():
"""Мониторинг цен конкурентов"""
competitors = [
{"name": "Конкурент 1", "url": "https://competitor1.com/pricing"},
{"name": "Конкурент 2", "url": "https://competitor2.com/plans"},
]
headers = {"X-API-Key": FIRECRAWL_API_KEY}
for competitor in competitors:
try:
# Парсим страницу с ценами
response = requests.post(f"{FIRECRAWL_BASE_URL}/scrape",
headers=headers,
json={
"url": competitor["url"],
"formats": ["markdown"],
"onlyMainContent": True
}
)
data = response.json()
if data.get('success'):
content = data['data']['markdown']
# Ищем упоминания цен (простая проверка)
price_keywords = ['$', '₽', 'руб', 'бесплатно', 'free', 'price']
has_price_changes = any(keyword.lower() in content.lower()
for keyword in price_keywords)
if has_price_changes:
# Отправляем данные в Zapier
zapier_payload = {
"competitor": competitor["name"],
"url": competitor["url"],
"content_preview": content[:500] + "...",
"full_content": content,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"alert_type": "price_update"
}
# Webhook в Zapier
zapier_response = requests.post(ZAPIER_WEBHOOK_URL,
json=zapier_payload)
if zapier_response.status_code == 200:
print(f"* Уведомление отправлено для {competitor['name']}")
else:
print(f"❌ Ошибка Zapier для {competitor['name']}")
except Exception as e:
print(f"❌ Ошибка мониторинга {competitor['name']}: {e}")
def search_and_notify(search_query: str, notification_type: str):
"""Поиск контента с уведомлением"""
headers = {"X-API-Key": FIRECRAWL_API_KEY}
# AI поиск с анализом
response = requests.post(f"{FIRECRAWL_BASE_URL}/search",
headers=headers,
json={
"query": search_query,
"limit": 5,
"parseResults": True,
"aiPrompt": f"Проанализируй ключевые новости и тренды по теме: {search_query}"
}
)
data = response.json()
if data.get('success'):
# Подготовка данных для Zapier
zapier_payload = {
"search_query": search_query,
"notification_type": notification_type,
"ai_analysis": data['data'].get('markdown', 'Анализ недоступен'),
"sources": data['data'].get('pages', [])[:3], # Топ-3 источника
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"total_found": len(data['data'].get('pages', []))
}
# Отправка в Zapier
zapier_response = requests.post(ZAPIER_WEBHOOK_URL, json=zapier_payload)
if zapier_response.status_code == 200:
print(f"* Дайджест по '{search_query}' отправлен")
return True
return False
# Планирование задач
schedule.every(2).hours.do(monitor_competitor_prices)
schedule.every().day.at("09:00").do(lambda: search_and_notify("ИИ новости", "daily_digest"))
schedule.every().monday.at("10:00").do(lambda: search_and_notify("технологические тренды", "weekly_report"))
print("🚀 Автоматизация через Zapier запущена!")
# Основной цикл
while True:
schedule.run_pending()
time.sleep(60)
🔔 Новое обновление от конкурента {{competitor}}
📍 URL: {{url}}
🕐 Время: {{timestamp}}
📄 Превью:
{{content_preview}}
🔗 [Подробнее]({{url}})
{{timestamp}}{{competitor}}{{url}}{{alert_type}}{{content_preview}}Обновление от {{competitor}} - {{timestamp}}<h2>Обновление конкурентов</h2>
<p><strong>Компания:</strong> {{competitor}}</p>
<p><strong>URL:</strong> <a href="{{url}}">{{url}}</a></p>
<p><strong>Время:</strong> {{timestamp}}</p>
<h3>Контент:</h3>
<div style="background: #f5f5f5; padding: 15px;">
{{full_content}}
</div>
def news_aggregator():
"""Агрегация новостей с AI анализом"""
topics = [
"искусственный интеллект новости",
"блокчейн технологии",
"стартапы инвестиции"
]
headers = {"X-API-Key": FIRECRAWL_API_KEY}
digest = []
for topic in topics:
response = requests.post(f"{FIRECRAWL_BASE_URL}/search",
headers=headers,
json={
"query": topic,
"limit": 3,
"parseResults": True,
"aiPrompt": f"Создай краткое резюме главных новостей по теме: {topic}"
}
)
data = response.json()
if data.get('success'):
digest.append({
"topic": topic,
"analysis": data['data'].get('markdown', ''),
"sources": data['data'].get('pages', [])[:2]
})
# Отправка дайджеста в Zapier
zapier_payload = {
"digest_type": "daily_news",
"date": time.strftime("%Y-%m-%d"),
"topics_count": len(digest),
"digest_content": digest,
"generated_at": time.strftime("%H:%M")
}
return requests.post(ZAPIER_WEBHOOK_URL, json=zapier_payload)
# Ежедневный дайджест в 9:00
schedule.every().day.at("09:00").do(news_aggregator)
def job_hunter():
"""Поиск вакансий с автоматической подачей заявок"""
job_searches = [
"python разработчик вакансии",
"data scientist работа",
"машинное обучение инженер"
]
headers = {"X-API-Key": FIRECRAWL_API_KEY}
found_jobs = []
for search in job_searches:
response = requests.post(f"{FIRECRAWL_BASE_URL}/search",
headers=headers,
json={
"query": search,
"limit": 5,
"categories": ["jobs"] # если поддерживается
}
)
data = response.json()
if data.get('success'):
for job in data['data']['pages'][:2]: # Топ-2 по каждому запросу
# Дополнительно парсим страницу вакансии
job_details = requests.post(f"{FIRECRAWL_BASE_URL}/scrape",
headers=headers,
json={
"url": job["url"],
"formats": ["markdown"],
"onlyMainContent": True
}
)
job_data = job_details.json()
if job_data.get('success'):
# AI анализ соответствия
ai_analysis = requests.post(f"{FIRECRAWL_BASE_URL}/search",
headers=headers,
json={
"query": f"анализ вакансии {job['title']}",
"limit": 1,
"parseResults": True,
"aiPrompt": f"Оцени подходит ли эта вакансия для Python разработчика с 3 годами опыта: {job_data['data']['markdown'][:1000]}"
}
)
found_jobs.append({
"title": job["title"],
"url": job["url"],
"description": job["description"],
"full_details": job_data['data']['markdown'][:1000],
"ai_match_score": "Подходит" if "подходит" in ai_analysis.json().get('data', {}).get('markdown', '').lower() else "Не подходит",
"search_query": search
})
# Отправка в Zapier для обработки
zapier_payload = {
"scan_type": "job_search",
"jobs_found": len(found_jobs),
"jobs": found_jobs,
"scan_date": time.strftime("%Y-%m-%d %H:%M")
}
return requests.post(ZAPIER_WEBHOOK_URL, json=zapier_payload)
# Поиск вакансий каждые 4 часа в рабочее время
schedule.every().day.at("09:00").do(job_hunter)
schedule.every().day.at("13:00").do(job_hunter)
schedule.every().day.at("17:00").do(job_hunter)
def social_media_content():
"""Генерация контента для соцсетей"""
trending_topics = [
"технологические новости сегодня",
"стартап успех история",
"ИИ прорыв 2025"
]
headers = {"X-API-Key": FIRECRAWL_API_KEY}
social_posts = []
for topic in trending_topics:
response = requests.post(f"{FIRECRAWL_BASE_URL}/search",
headers=headers,
json={
"query": topic,
"limit": 2,
"parseResults": True,
"aiPrompt": f"Создай интересный пост для LinkedIn на 150 слов по теме: {topic}. Добавь хэштеги и call-to-action"
}
)
data = response.json()
if data.get('success'):
social_posts.append({
"topic": topic,
"post_content": data['data'].get('markdown', ''),
"sources": [page['url'] for page in data['data'].get('pages', [])[:2]],
"suggested_time": "optimal" # Можно добавить логику оптимального времени
})
# Отправка в Zapier для публикации
zapier_payload = {
"content_type": "social_media_batch",
"posts_count": len(social_posts),
"posts": social_posts,
"generated_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
return requests.post(ZAPIER_WEBHOOK_URL, json=zapier_payload)
# Генерация контента 3 раза в неделю
schedule.every().monday.at("10:00").do(social_media_content)
schedule.every().wednesday.at("14:00").do(social_media_content)
schedule.every().friday.at("16:00").do(social_media_content)
content_type equals social_media_batch{{posts.0.post_content}}{{search_query}}{{ai_analysis}}{{timestamp}}{{sources}}{{competitor}}{{timestamp}}{{content_preview}}Trigger: Webhook
Filter: alert_type = "price_update"
Action: Slack Channel Message
Template: "🔔 {{competitor}} обновил цены: {{url}}"
Trigger: Webhook
Filter: notification_type = "daily_digest"
Action: Gmail Send Email
Subject: "Дайджест новостей {{timestamp}}"
Body: {{ai_analysis}}
Trigger: Webhook
Filter: scan_type = "job_search"
Action 1: Google Sheets - Add Row
Action 2: Gmail - Send if ai_match_score = "Подходит"
import json
from datetime import datetime, timedelta
class ZapierAnalytics:
def __init__(self):
self.webhook_calls = []
self.success_rate = {}
def log_webhook_call(self, payload: dict, response_code: int):
"""Логирование вызовов webhook"""
self.webhook_calls.append({
'timestamp': datetime.now(),
'payload_type': payload.get('alert_type', 'unknown'),
'response_code': response_code,
'success': response_code == 200
})
def enhanced_webhook_call(self, payload: dict, webhook_url: str):
"""Расширенный вызов webhook с аналитикой"""
try:
response = requests.post(webhook_url, json=payload)
self.log_webhook_call(payload, response.status_code)
if response.status_code != 200:
print(f"⚠️ Zapier webhook failed: {response.status_code}")
# Можно добавить retry логику
return response
except Exception as e:
self.log_webhook_call(payload, 0)
print(f"❌ Webhook error: {e}")
def get_webhook_stats(self):
"""Статистика webhook вызовов"""
if not self.webhook_calls:
return {}
total_calls = len(self.webhook_calls)
successful_calls = sum(1 for call in self.webhook_calls if call['success'])
# Статистика за последние 24 часа
day_ago = datetime.now() - timedelta(hours=24)
recent_calls = [call for call in self.webhook_calls if call['timestamp'] > day_ago]
return {
'total_webhook_calls': total_calls,
'success_rate': successful_calls / total_calls * 100,
'calls_last_24h': len(recent_calls),
'most_frequent_type': max(set(call['payload_type'] for call in self.webhook_calls),
key=lambda x: sum(1 for call in self.webhook_calls if call['payload_type'] == x))
}
# Использование аналитики
analytics = ZapierAnalytics()
def monitored_webhook_call(payload):
return analytics.enhanced_webhook_call(payload, ZAPIER_WEBHOOK_URL)
# Отправка статистики раз в день
def daily_analytics_report():
stats = analytics.get_webhook_stats()
report_payload = {
"report_type": "zapier_analytics",
"date": datetime.now().strftime("%Y-%m-%d"),
"statistics": stats
}
# Отправка отчета в отдельный Zapier webhook для аналитики
requests.post("YOUR_ANALYTICS_WEBHOOK_URL", json=report_payload)
schedule.every().day.at("23:55").do(daily_analytics_report)
# Batch запросы для ускорения
def batch_scrape_for_zapier(urls_batch):
results = []
for url in urls_batch:
# Scrape каждый URL
result = scrape_single_url(url)
results.append(result)
# Одна отправка в Zapier для всего batch
zapier_payload = {
"batch_type": "bulk_scrape",
"results": results,
"processed_count": len(results)
}
return requests.post(ZAPIER_WEBHOOK_URL, json=zapier_payload)
def robust_zapier_integration():
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
# Основная логика
result = process_and_send_to_zapier()
return result
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {retry_delay}s...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print(f"All attempts failed: {e}")
# Отправка уведомления об ошибке
error_payload = {
"error_type": "integration_failure",
"error_message": str(e),
"timestamp": datetime.now().isoformat()
}
requests.post(ERROR_WEBHOOK_URL, json=error_payload)