This project consisting implementation of ingesting stream data from fleet management sources.
This project is done with Confluetn, Kafka, Clickhouse, Perset and Python technologies.
Objective was to extract data to answer these questions:
- What is a current location of certain car?
- Detect pattern in fleet movement
- Excessive usage of cars
- Detect abnormal engine work
- Report drivers activity
- Other machine learning applications:
- Predictive maintenance
This application and final reports could be used by Fleet Manager, Analysts or Fleet Maintenance Department
| Author | GitHub profile |
|---|---|
| Pawel Dymek | pdymek |
The solutions is done with help of:
- Kafka connect datagen docs - https://github.com/confluentinc/kafka-connect-datagen/
- Resources provided during DEC courses - https://dataengineercamp.com/
The data comes from three following datagen sources:
- fleet_mgmt_description
- fleet_mgmt_location
- fleet_mgmt_sensors
List of sources and ingested fields:
The data are organised in nested .json file.
Sample of .json data:
{
"namespace": "fleet_mgmt",
"name": "fleet_mgmt_location",
"type": "record",
"fields": [
{
"name": "vehicle_id",
"type": {
"type": "int",
"arg.properties": {
"range": {
"min": 1000,
"max": 9999
}
}
}
},
{
"name": "location",
"type": {
"type": "record",
"name": "location",
"fields": [
{
"name": "latitude",
"type": "double"
},
{
"name": "longitude",
"type": "double"
}
]
},
"arg.properties": {
"options": [
{
"latitude": 37.416834,
"longitude": -121.975002
},
{
...
Sample ksql code for create source connector:
CREATE SOURCE CONNECTOR IF NOT EXISTS LOCATION WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'fleet_mgmt_location',
'quickstart' = 'fleet_mgmt_location',
'maxInterval' = '10',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);Three kafka topics for fleet data consumption:

Output tables if form of precalculated report views or sql queries:

Top 10 sesnsor usage by driver:

Usage distribution on calendar:
Check if Clickhouse connection works and required tables exists by running in command line:
pytest tests
The tests are also invoked during pull request to main brach using GitHub Actions:




