#!/usr/bin/env python3 """ Test script for Opportunity Management Module """ import asyncio import sys import os # Add the module to Python path sys.path.insert(0, os.path.expanduser('~/repos/opportunity_management')) from opportunity_management import create_opportunity, get_opportunity_by_id, list_opportunities async def test_opportunity_creation(): """Test opportunity creation""" print("Testing opportunity creation...") # Test data opportunity_data = { 'customer_name': 'Test Customer', 'estimated_amount': 100000.00, 'sales_stage': '初步接洽', 'expected_close_date': '2026-12-31', 'description': 'Test opportunity created by automated test', 'source': 'manual' } try: # Create opportunity (using dummy org_id and user_id for testing) opportunity_id = await create_opportunity(opportunity_data, 'test_user', 'test_org') print(f"✓ Opportunity created successfully: {opportunity_id}") # Retrieve the created opportunity opportunity = await get_opportunity_by_id(opportunity_id, 'test_org') if opportunity: print(f"✓ Opportunity retrieved successfully") print(f" Customer: {opportunity['customer_name']}") print(f" Amount: {opportunity['estimated_amount']}") print(f" Stage: {opportunity['sales_stage']}") else: print("✗ Failed to retrieve opportunity") return True except Exception as e: print(f"✗ Error creating opportunity: {e}") return False async def test_opportunity_listing(): """Test opportunity listing""" print("\nTesting opportunity listing...") try: opportunities, total = await list_opportunities('test_org', page=1, page_size=10) print(f"✓ Retrieved {len(opportunities)} opportunities out of {total} total") return True except Exception as e: print(f"✗ Error listing opportunities: {e}") return False async def main(): """Main test function""" print("Opportunity Management Module - Integration Test") print("=" * 50) success = True # Test opportunity creation success &= await test_opportunity_creation() # Test opportunity listing success &= await test_opportunity_listing() print("\n" + "=" * 50) if success: print("✓ All tests passed! Module is working correctly.") else: print("✗ Some tests failed. Please check the implementation.") return success if __name__ == "__main__": asyncio.run(main())