License | BSD3 |
---|---|
Stability | stable |
Safe Haskell | None |
Language | Haskell2010 |
AWS.Lambda.Events.Kafka
Contents
Description
It is possible to subscribe Lambda functions to Kafka topics. You can subscribe to topics from Amazon Managed Streaming for Kafka (MSK) as well as self-managed Kafka clusters.
Lambda considers Amazon Managed Streaming for Kafka (MSK) to be a different event source type from a self-managed Apache Kafka cluster, but their payloads are very similar. The types in this module are derived from inspecting invocation payloads, and from reading the following links:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
- https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
- https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/
Synopsis
- data KafkaEvent = KafkaEvent {
- eventSource :: !EventSource
- eventSourceArn :: !(Maybe Text)
- bootstrapServers :: !(NonEmpty Text)
- records :: !(Map Text [Record])
- data EventSource
- type Record = Record' ByteString
- data Record' a = Record {}
- data Header = Header !Text !ByteString
- data Timestamp
- parseTimestamp :: Object -> Parser Timestamp
- unparseTimestamp :: KeyValue e kv => Timestamp -> [kv]
- int64ToUTCTime :: Int64 -> UTCTime
- utcTimeToInt64 :: UTCTime -> Int64
Documentation
data KafkaEvent Source #
Represents an event from either Amazon MSK or a self-managed Apache Kafka cluster, as the payloads are very similar.
The ToJSON
and FromJSON
instances on Record
perform base64
conversion for you.
See the AWS documentation for a sample payload.
Constructors
KafkaEvent | |
Fields
|
Instances
data EventSource Source #
Constructors
AwsKafka | "aws:kafka" |
SelfManagedKafka | SelfManagedKafka |
Instances
FromJSON EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka | |||||
ToJSON EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka Methods toJSON :: EventSource -> Value # toEncoding :: EventSource -> Encoding # toJSONList :: [EventSource] -> Value # toEncodingList :: [EventSource] -> Encoding # omitField :: EventSource -> Bool # | |||||
Bounded EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka | |||||
Enum EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka Methods succ :: EventSource -> EventSource # pred :: EventSource -> EventSource # toEnum :: Int -> EventSource # fromEnum :: EventSource -> Int # enumFrom :: EventSource -> [EventSource] # enumFromThen :: EventSource -> EventSource -> [EventSource] # enumFromTo :: EventSource -> EventSource -> [EventSource] # enumFromThenTo :: EventSource -> EventSource -> EventSource -> [EventSource] # | |||||
Generic EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka Associated Types
| |||||
Show EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka Methods showsPrec :: Int -> EventSource -> ShowS # show :: EventSource -> String # showList :: [EventSource] -> ShowS # | |||||
Eq EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka | |||||
Ord EventSource Source # | |||||
Defined in AWS.Lambda.Events.Kafka Methods compare :: EventSource -> EventSource -> Ordering # (<) :: EventSource -> EventSource -> Bool # (<=) :: EventSource -> EventSource -> Bool # (>) :: EventSource -> EventSource -> Bool # (>=) :: EventSource -> EventSource -> Bool # max :: EventSource -> EventSource -> EventSource # min :: EventSource -> EventSource -> EventSource # | |||||
type Rep EventSource Source # | |||||
type Record = Record' ByteString Source #
Convenience alias: most of the time you will parse the records straight into some app-specific structure.
Records from a Kafka event. This is Traversable
, which means
you can do things like parse a JSON-encoded payload:
traverse
decodeStrict
::FromJSON
a => Record -> Maybe (Record' a)
Constructors
Record | |
Instances
Functor Record' Source # | |||||
Foldable Record' Source # | |||||
Defined in AWS.Lambda.Events.Kafka Methods fold :: Monoid m => Record' m -> m # foldMap :: Monoid m => (a -> m) -> Record' a -> m # foldMap' :: Monoid m => (a -> m) -> Record' a -> m # foldr :: (a -> b -> b) -> b -> Record' a -> b # foldr' :: (a -> b -> b) -> b -> Record' a -> b # foldl :: (b -> a -> b) -> b -> Record' a -> b # foldl' :: (b -> a -> b) -> b -> Record' a -> b # foldr1 :: (a -> a -> a) -> Record' a -> a # foldl1 :: (a -> a -> a) -> Record' a -> a # elem :: Eq a => a -> Record' a -> Bool # maximum :: Ord a => Record' a -> a # minimum :: Ord a => Record' a -> a # | |||||
Traversable Record' Source # | |||||
FromJSON (Record' ByteString) Source # | Decodes base64-encoded keys and values, where present. | ||||
Defined in AWS.Lambda.Events.Kafka Methods parseJSON :: Value -> Parser (Record' ByteString) # parseJSONList :: Value -> Parser [Record' ByteString] # omittedField :: Maybe (Record' ByteString) # | |||||
ToJSON (Record' ByteString) Source # | Encodes keys and values into base64. | ||||
Defined in AWS.Lambda.Events.Kafka Methods toJSON :: Record' ByteString -> Value # toEncoding :: Record' ByteString -> Encoding # toJSONList :: [Record' ByteString] -> Value # toEncodingList :: [Record' ByteString] -> Encoding # omitField :: Record' ByteString -> Bool # | |||||
Generic (Record' a) Source # | |||||
Defined in AWS.Lambda.Events.Kafka Associated Types
| |||||
Show a => Show (Record' a) Source # | |||||
Eq a => Eq (Record' a) Source # | |||||
type Rep (Record' a) Source # | |||||
Defined in AWS.Lambda.Events.Kafka type Rep (Record' a) = D1 ('MetaData "Record'" "AWS.Lambda.Events.Kafka" "hal-1.1-ZuKBgFp1BmBIjVnwMMR99" 'False) (C1 ('MetaCons "Record" 'PrefixI 'True) ((S1 ('MetaSel ('Just "topic") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Text) :*: (S1 ('MetaSel ('Just "partition") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Int32) :*: S1 ('MetaSel ('Just "offset") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Int64))) :*: ((S1 ('MetaSel ('Just "timestamp") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Timestamp) :*: S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [Header])) :*: (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 (Maybe ByteString)) :*: S1 ('MetaSel ('Just "value") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 (Maybe a)))))) |
AWS serialises record headers to JSON as an array of objects. From their docs:
"headers":[{"headerKey":[104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
Note:
>>>
map chr [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
"headerValue"
Constructors
Header !Text !ByteString |
Instances
FromJSON Header Source # | |||||
Defined in AWS.Lambda.Events.Kafka | |||||
ToJSON Header Source # | |||||
Generic Header Source # | |||||
Defined in AWS.Lambda.Events.Kafka Associated Types
| |||||
Show Header Source # | |||||
Eq Header Source # | |||||
type Rep Header Source # | |||||
Defined in AWS.Lambda.Events.Kafka type Rep Header = D1 ('MetaData "Header" "AWS.Lambda.Events.Kafka" "hal-1.1-ZuKBgFp1BmBIjVnwMMR99" 'False) (C1 ('MetaCons "Header" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Text) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 ByteString))) |
Kafka timestamp types, derived from the Java client's enum at: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
Constructors
NoTimestampType | |
CreateTime !UTCTime | |
LogAppendTime !UTCTime |
Instances
Generic Timestamp Source # | |||||
Defined in AWS.Lambda.Events.Kafka Associated Types
| |||||
Show Timestamp Source # | |||||
Eq Timestamp Source # | |||||
type Rep Timestamp Source # | |||||
Defined in AWS.Lambda.Events.Kafka type Rep Timestamp = D1 ('MetaData "Timestamp" "AWS.Lambda.Events.Kafka" "hal-1.1-ZuKBgFp1BmBIjVnwMMR99" 'False) (C1 ('MetaCons "NoTimestampType" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "CreateTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 UTCTime)) :+: C1 ('MetaCons "LogAppendTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 UTCTime)))) |
Internal
unparseTimestamp :: KeyValue e kv => Timestamp -> [kv] Source #
int64ToUTCTime :: Int64 -> UTCTime Source #
utcTimeToInt64 :: UTCTime -> Int64 Source #