البرمجة غير المتزامنة باستخدام asyncio في بايثون
مقدمة
تعتبر البرمجة غير المتزامنة (Asynchronous Programming) من الأساليب الهامة لتحسين أداء التطبيقات التي تعتمد على إدخال/إخراج (I/O) طويل. مكتبة asyncio
في بايثون توفر أدوات قوية لإنشاء وبرمجة عمليات غير متزامنة بشكل فعال. في هذا الدليل، سنتناول كيفية استخدام asyncio
لكتابة وظائف غير متزامنة والتعامل مع عمليات الإدخال/الإخراج بكفاءة.
الخطوة 1: إعداد البيئة
- تثبيت بايثون: تأكد من أنك تستخدم بايثون 3.7 أو أعلى. يمكنك تنزيل بايثون من الموقع الرسمي.
- إعداد بيئة العمل: تأكد من أنك قد أنشأت بيئة افتراضية لمشروعك. يمكنك إنشاء بيئة افتراضية باستخدام:
python -m venv myenv
source myenv/bin/activate # على أنظمة Unix
myenv\Scripts\activate # على أنظمة Windows
الخطوة 2: إنشاء وظائف غير متزامنة
- تعريف وظيفة غير متزامنة:
- استخدم الكلمة الأساسية
async
لتحديد وظيفة غير متزامنة. - استخدم
await
لاستدعاء وظائف غير متزامنة أخرى بداخلها.
- استخدم الكلمة الأساسية
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(say_hello())
في هذا المثال، say_hello
هي وظيفة غير متزامنة تقوم بطباعة “Hello”، ثم تنتظر لمدة ثانية، ثم تطبع “World”.
الخطوة 3: استخدام asyncio.gather
لتنفيذ مهام متعددة
- تعريف مهام متعددة:
- استخدم
asyncio.gather
لتشغيل وظائف غير متزامنة متعددة في وقت واحد.
- استخدم
import asyncio
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(1)
print("Task 2 completed")
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
في هذا المثال، task1
و task2
يتم تشغيلهما بالتوازي، ويتم استخدام asyncio.gather
لانتظار انتهاء كلتا المهام.
الخطوة 4: إدارة الوقت باستخدام asyncio.wait
- انتظار انتهاء بعض المهام:
- استخدم
asyncio.wait
لإدارة توقيت انتهاء المهام.
- استخدم
import asyncio
async def slow_task():
await asyncio.sleep(3)
print("Slow task finished")
async def fast_task():
await asyncio.sleep(1)
print("Fast task finished")
async def main():
slow = asyncio.create_task(slow_task())
fast = asyncio.create_task(fast_task())
done, pending = await asyncio.wait([slow, fast], timeout=2)
for task in done:
print(f"{task.get_name()} is done")
for task in pending:
print(f"{task.get_name()} is still pending")
task.cancel()
asyncio.run(main())
في هذا المثال، يتم استخدام asyncio.wait
للانتظار حتى تنتهي المهام أو حتى ينقضي الوقت المحدد.
الخطوة 5: التعامل مع استثناءات في الوظائف غير المتزامنة
- معالجة الاستثناءات:
- استخدم
try
وexcept
لمعالجة الأخطاء في الوظائف غير المتزامنة.
- استخدم
import asyncio
async def risky_task():
await asyncio.sleep(2)
raise ValueError("Something went wrong!")
async def main():
try:
await risky_task()
except ValueError as e:
print(f"Caught an exception: {e}")
asyncio.run(main())
في هذا المثال، يتم استخدام try
و except
للتعامل مع الاستثناءات التي قد تحدث أثناء تنفيذ وظيفة غير متزامنة.
الخاتمة
في هذا الدليل، تعلمت كيفية استخدام مكتبة asyncio
في بايثون لكتابة برامج غير متزامنة. لقد غطينا كيفية إنشاء وظائف غير متزامنة، وتنفيذ مهام متعددة بالتوازي، وإدارة الوقت، والتعامل مع الاستثناءات. باستخدام asyncio
، يمكنك تحسين أداء التطبيقات التي تعتمد على عمليات الإدخال/الإخراج غير المتزامنة، مما يجعلها أكثر كفاءة وسرعة.
اترك تعليقاً