# Copyright 2018-Present The CloudEvents Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from flask import Flask, request from cloudevents.http import from_http app = Flask(__name__) # create an endpoint at http://localhost:/3000/ @app.route("/", methods=["POST"]) def home(): # create a CloudEvent event = from_http(request.headers, request.get_data()) # you can access cloudevent fields as seen below print( f"Found {event['id']} from {event['source']} with type " f"{event['type']} and specversion {event['specversion']}" ) return "", 204 if __name__ == "__main__": app.run(port=3000)