chatgpt/webapp/celery-test.py

38 lines
904 B
Python
Executable File

#!/usr/bin/python3
from celery import Celery
from celery.backends import sqlalchemy
from flask import Flask
import requests
# create Flask app
app = Flask(__name__)
# create a Celery app
celery_app = Celery('tasks')
# configure the app to use the sqlalchemy backend and specify the SQLite database URL as the broker
app.conf.update(
result_backend='sqlalchemy',
broker_url='sqlite:///celery.db',
task_serializer='json',
result_serializer='json',
)
@celery_app.task
def get_public_ip():
response = requests.get('https://ifconfig.me/ip')
return response.text
# define a Flask route that returns the current public IP address
@app.route('/public_ip')
def public_ip():
# call the get_public_ip task and wait for it to complete
result = get_public_ip.delay().get()
# return the task result as the response
return result
if __name__ == '__main__':
app.run()