Raccogliere i log di Nagios XI
Questo documento spiega come importare i log di Nagios XI in Google Security Operations utilizzando Google Cloud Storage V2.
Nagios XI è una soluzione completa di monitoraggio dell'infrastruttura che fornisce il monitoraggio di server, reti, applicazioni e servizi. Monitora lo stato di host e servizi, le metriche sul rendimento e genera avvisi per i problemi dell'infrastruttura IT tramite la sua API REST.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Un progetto GCP con le API Cloud Storage, Cloud Run, Pub/Sub e Cloud Scheduler abilitate
- Autorizzazioni per creare e gestire bucket GCS
- Autorizzazioni per gestire le policy IAM nei bucket GCS
- Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
- Accesso con privilegi a Nagios XI con autorizzazioni di gestione degli utenti
- Nagios XI versione 5 o successive (supporto API REST)
Configurare l'accesso all'API Nagios XI
Per consentire a Google SecOps di recuperare i dati di monitoraggio, devi creare un account utente con accesso API e autorizzazioni di sola lettura.
Creare un utente con autorizzazione di sola lettura con accesso API
- Accedi all'interfaccia web di Nagios XI con privilegi di amministratore.
- Vai ad Amministrazione > Gestisci utenti.
- Fai clic su Aggiungi nuovo utente.
- Fornisci i seguenti dettagli di configurazione:
- Nome utente: inserisci un nome descrittivo (ad esempio,
chronicle-integration) - Password: inserisci una password sicura
- Nome: inserisci
Chronicle Integration User - Indirizzo email: inserisci un indirizzo email valido
- Nome utente: inserisci un nome descrittivo (ad esempio,
- Nella sezione Impostazioni di sicurezza, configura quanto segue:
- Livello di autorizzazione: seleziona Utente.
- Può vedere tutti gli host e i servizi: seleziona questa opzione
- Accesso di sola lettura: seleziona questa opzione
- Accesso API: seleziona questa opzione.
- Deseleziona le seguenti opzioni:
- Forza modifica password al prossimo accesso
- Email User Account Information
- Crea come contatto di monitoraggio
- Assicurati che l'opzione Account abilitato sia selezionata.
- Fai clic su Aggiungi utente.
Recuperare la chiave API
- Nella pagina Gestisci utenti, fai clic sull'account utente (
chronicle-integration). - Nella pagina delle impostazioni dell'account utente, individua il campo Chiave API.
Copia il valore della chiave API.
Verifica le autorizzazioni
Per verificare che l'account disponga delle autorizzazioni richieste:
- Accedi a Nagios XI con l'account utente
chronicle-integration. - Vai a Home > Stato host.
- Se riesci a visualizzare tutti gli host e i servizi monitorati, disponi delle autorizzazioni necessarie.
- Se non riesci a visualizzare host o servizi, contatta l'amministratore per concedere l'autorizzazione Può visualizzare tutti gli host e i servizi.
Testare l'accesso API
Verifica le tue credenziali prima di procedere con l'integrazione:
# Replace with your actual values NAGIOS_HOST="https://your-nagios-server.example.com" API_KEY="your-api-key" # Test API access - query host status curl -k "${NAGIOS_HOST}/nagiosxi/api/v1/objects/hoststatus?apikey=${API_KEY}&pretty=1"Una risposta riuscita restituisce un oggetto JSON contenente informazioni sullo stato dell'host.
Creazione di un bucket Google Cloud Storage
- Vai alla console Google Cloud.
- Seleziona il tuo progetto o creane uno nuovo.
- Nel menu di navigazione, vai a Cloud Storage > Bucket.
- Fai clic su Crea bucket.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio nagios-xi-logs).Tipo di località Scegli in base alle tue esigenze (regione singola, a due regioni, multiregionale) Località Seleziona la posizione (ad esempio, us-central1).Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente) Controllo dell'accesso Uniforme (consigliato) Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione Fai clic su Crea.
Crea un account di servizio per la funzione Cloud Run
- Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
- Fai clic su Crea account di servizio.
- Fornisci i seguenti dettagli di configurazione:
- Nome del service account: inserisci
nagios-logs-collector-sa - Descrizione service account: inserisci
Service account for Cloud Run function to collect Nagios XI logs
- Nome del service account: inserisci
- Fai clic su Crea e continua.
- Nella sezione Concedi a questo account di servizio l'accesso al progetto, aggiungi i seguenti ruoli:
- Fai clic su Seleziona un ruolo.
- Cerca e seleziona Amministratore oggetti di archiviazione.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Cloud Run Invoker.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Invoker di Cloud Functions.
- Fai clic su Continua.
- Fai clic su Fine.
Concedi autorizzazioni IAM sul bucket GCS
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket (
nagios-xi-logs). - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: inserisci l'email del account di servizio (
nagios-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Assegna i ruoli: seleziona Storage Object Admin.
- Aggiungi entità: inserisci l'email del account di servizio (
- Fai clic su Salva.
Crea argomento Pub/Sub
- Nella console GCP, vai a Pub/Sub > Argomenti.
- Fai clic su Crea argomento.
- Fornisci i seguenti dettagli di configurazione:
- ID argomento: inserisci
nagios-logs-trigger - Lascia invariate le altre impostazioni predefinite
- ID argomento: inserisci
- Fai clic su Crea.
Crea una funzione Cloud Run per raccogliere i log
La funzione Cloud Run verrà attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dall'API REST di Nagios XI e scriverli in GCS.
- Nella console GCP, vai a Cloud Run.
- Fai clic su Crea servizio.
- Seleziona Funzione (usa un editor in linea per creare una funzione).
Nella sezione Configura, fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome servizio nagios-logs-collectorRegione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio, us-central1)Tempo di esecuzione Seleziona Python 3.12 o versioni successive Nella sezione Trigger (facoltativo):
- Fai clic su + Aggiungi trigger.
- Seleziona Cloud Pub/Sub.
- In Seleziona un argomento Cloud Pub/Sub, scegli
nagios-logs-trigger. - Fai clic su Salva.
Nella sezione Autenticazione:
- Seleziona Richiedi autenticazione.
- Controlla Identity and Access Management (IAM).
Scorri verso il basso ed espandi Container, networking, sicurezza.
Vai alla scheda Sicurezza:
- Service account (Account di servizio): seleziona
nagios-logs-collector-sa
- Service account (Account di servizio): seleziona
Vai alla scheda Container:
- Fai clic su Variabili e secret.
- Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
Nome variabile Valore di esempio Descrizione GCS_BUCKETnagios-xi-logsNome del bucket GCS GCS_PREFIXnagios-xiPrefisso per i file di log STATE_KEYnagios-xi/state.jsonPercorso file di stato NAGIOS_BASE_URLhttps://your-nagios-server.example.comURL di base di Nagios XI NAGIOS_API_KEYyour-api-keyChiave API Nagios XI MAX_RECORDS1000Numero massimo di record per endpoint per esecuzione PAGE_SIZE200Record per pagina API LOOKBACK_HOURS24Periodo di riferimento iniziale Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:
- Timeout richiesta: inserisci
600secondi (10 minuti)
- Timeout richiesta: inserisci
Vai alla scheda Impostazioni:
- Nella sezione Risorse:
- Memoria: seleziona 512 MiB o superiore
- CPU: seleziona 1
- Nella sezione Risorse:
Nella sezione Scalabilità della revisione:
- Numero minimo di istanze: inserisci
0 - Numero massimo di istanze: inserisci
100
- Numero minimo di istanze: inserisci
Fai clic su Crea.
Attendi la creazione del servizio (1-2 minuti).
Dopo aver creato il servizio, si aprirà automaticamente l'editor di codice incorporato.
Aggiungi codice per la funzione
- Inserisci main nel campo Entry point (Punto di ingresso).
Nell'editor di codice incorporato, crea due file:
main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) storage_client = storage.Client() GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'nagios-xi') STATE_KEY = os.environ.get('STATE_KEY', 'nagios-xi/state.json') NAGIOS_BASE_URL = os.environ.get('NAGIOS_BASE_URL', '').rstrip('/') NAGIOS_API_KEY = os.environ.get('NAGIOS_API_KEY') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '200')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) ENDPOINTS = [ 'hoststatus', 'servicestatus', 'statehistory', 'logentries', ] @functions_framework.cloud_event def main(cloud_event): if not all([GCS_BUCKET, NAGIOS_BASE_URL, NAGIOS_API_KEY]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) state = load_state(bucket) now = datetime.now(timezone.utc) if isinstance(state, dict) and state.get('last_event_time'): try: last_val = state['last_event_time'] if last_val.endswith('Z'): last_val = last_val[:-1] + '+00:00' last_time = datetime.fromisoformat(last_val) last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") last_time = now - timedelta(hours=LOOKBACK_HOURS) else: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") start_ts = int(last_time.timestamp()) end_ts = int(now.timestamp()) all_records = [] newest_time = None for endpoint in ENDPOINTS: records, endpoint_newest = fetch_nagios_endpoint( endpoint, start_ts, end_ts ) for record in records: record['_nagios_endpoint'] = endpoint all_records.extend(records) if endpoint_newest: if newest_time is None or endpoint_newest > newest_time: newest_time = endpoint_newest if not all_records: print("No new records found.") save_state(bucket, now.isoformat()) return timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/nagios_logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join( [json.dumps(r, ensure_ascii=False, default=str) for r in all_records] ) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(all_records)} records to gs://{GCS_BUCKET}/{object_key}") if newest_time: save_state(bucket, newest_time) else: save_state(bucket, now.isoformat()) print(f"Successfully processed {len(all_records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def fetch_nagios_endpoint(endpoint, start_ts, end_ts): base_url = f"{NAGIOS_BASE_URL}/nagiosxi/api/v1/objects/{endpoint}" records = [] newest_time = None start_index = 0 page_num = 0 backoff = 1.0 while True: page_num += 1 if len(records) >= MAX_RECORDS: print(f"{endpoint}: Reached max_records limit ({MAX_RECORDS})") break remaining = min(PAGE_SIZE, MAX_RECORDS - len(records)) params = [ f"apikey={NAGIOS_API_KEY}", f"starttime={start_ts}", f"endtime={end_ts}", f"records={remaining}", f"start={start_index}", ] url = f"{base_url}?{'&'.join(params)}" try: response = http.request('GET', url) if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"{endpoint}: Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"{endpoint}: HTTP Error {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") break data = json.loads(response.data.decode('utf-8')) record_count = data.get('recordcount', 0) result_key = get_result_key(endpoint) page_results = data.get(result_key, []) if not page_results: print(f"{endpoint}: No more results at offset {start_index}") break print(f"{endpoint} page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) for event in page_results: try: event_time = event.get('state_time') or event.get('status_update_time') or event.get('entry_time') if event_time: if newest_time is None or event_time > newest_time: newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") if len(page_results) < remaining: print(f"{endpoint}: Reached last page (returned {len(page_results)} < {remaining})") break start_index += len(page_results) except Exception as e: print(f"{endpoint}: Error fetching logs: {e}") break print(f"{endpoint}: Retrieved {len(records)} total records from {page_num} pages") return records, newest_time def get_result_key(endpoint): key_map = { 'hoststatus': 'hoststatus', 'servicestatus': 'servicestatus', 'statehistory': 'statehistory', 'logentries': 'logentry', } return key_map.get(endpoint, endpoint) def load_state(bucket): try: blob = bucket.blob(STATE_KEY) if blob.exists(): return json.loads(blob.download_as_text()) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, last_event_time_iso): try: state = { 'last_event_time': last_event_time_iso, 'last_run': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}")requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.
Attendi il completamento del deployment (2-3 minuti).
Crea job Cloud Scheduler
- Nella console di GCP, vai a Cloud Scheduler.
- Fai clic su Crea job.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome nagios-logs-collector-hourlyRegione Seleziona la stessa regione della funzione Cloud Run Frequenza 0 * * * *(ogni ora, all'ora)Fuso orario Seleziona il fuso orario (UTC consigliato) Tipo di target Pub/Sub Argomento Seleziona nagios-logs-triggerCorpo del messaggio {}(oggetto JSON vuoto)Fai clic su Crea.
Opzioni di frequenza di pianificazione
Scegli la frequenza in base al volume dei log e ai requisiti di latenza:
| Frequenza | Espressione cron | Caso d'uso |
|---|---|---|
| Ogni 5 minuti | */5 * * * * |
Volume elevato, bassa latenza |
| Ogni 15 minuti | */15 * * * * |
Volume medio |
| Ogni ora | 0 * * * * |
Standard (consigliato) |
| Ogni 6 ore | 0 */6 * * * |
Volume basso, elaborazione batch |
| Ogni giorno | 0 0 * * * |
Raccolta dei dati storici |
Testare l'integrazione
- Nella console Cloud Scheduler, trova il tuo job (
nagios-logs-collector-hourly). - Fai clic su Forza esecuzione per attivare il job manualmente.
- Attendi qualche secondo.
- Vai a Cloud Run > Servizi.
- Fai clic su
nagios-logs-collector. - Fai clic sulla scheda Log.
Verifica che la funzione sia stata eseguita correttamente. Cerca:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 hoststatus page 1: Retrieved X events servicestatus page 1: Retrieved X events statehistory page 1: Retrieved X events logentries page 1: Retrieved X events Wrote X records to gs://nagios-xi-logs/nagios-xi/nagios_logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsVai a Cloud Storage > Bucket.
Fai clic su
nagios-xi-logs.Vai alla cartella
nagios-xi/.Verifica che sia stato creato un nuovo file
.ndjsoncon il timestamp corrente.
Se visualizzi errori nei log:
- HTTP 401: verifica che la variabile di ambiente
NAGIOS_API_KEYsia corretta e che l'utente abbia abilitato l'accesso API - HTTP 403: verifica che l'account utente disponga dell'autorizzazione Può visualizzare tutti gli host e i servizi
- HTTP 429: limitazione di frequenza: la funzione eseguirà automaticamente un nuovo tentativo con backoff
- Variabili di ambiente mancanti: verifica che tutte le variabili richieste siano impostate nella configurazione della funzione Cloud Run
Recuperare il account di servizio Google SecOps
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Nagios XI Logs GCS). - Seleziona Google Cloud Storage V2 come Tipo di origine.
- Seleziona Nagios come Tipo di log.
- Fai clic su Ottieni service account.
Viene visualizzata un'email del account di servizio univoca. Ad esempio:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopia questo indirizzo email per utilizzarlo nel passaggio successivo.
Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
URL bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:
gs://nagios-xi-logs/nagios-xi/
Opzione di eliminazione della fonte: seleziona l'opzione di eliminazione in base alle tue preferenze:
- Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
- Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.
Età massima del file: includi i file modificati nell'ultimo numero di giorni (il valore predefinito è 180 giorni)
Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset
Etichette di importazione: l'etichetta da applicare agli eventi di questo feed
Fai clic su Avanti.
Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Concedi le autorizzazioni IAM al account di servizio Google SecOps
- Vai a Cloud Storage > Bucket.
- Fai clic su
nagios-xi-logs. - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: incolla l'email del account di servizio Google SecOps
- Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
Fai clic su Salva.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
| src_ip | has_principal | Impostato su "true" se l'unione di src_ip a principal.ip e principal.asset.ip va a buon fine |
| src_ip | principal.asset.ip | Valore copiato direttamente |
| src_ip | principal.ip | Valore copiato direttamente |
| jobid | jobid_label.key | Impostato su "Jobid" |
| jobid | jobid_label.value | Valore copiato direttamente |
| jobid_label | principal.resource.attribute.labels | Unito da jobid_label |
| pid | principal.process.pid | Valore copiato direttamente |
| ent | metadata.product_event_type | Valore copiato direttamente |
| descrizione | metadata.description | Valore copiato direttamente |
| msg_ip | has_target | Impostato su "true" se l'unione di msg_ip a target.ip e target.asset.ip va a buon fine |
| msg_ip | target.asset.ip | Valore copiato direttamente |
| msg_ip | target.ip | Valore copiato direttamente |
| porta | target.port | Valore copiato direttamente e convertito in numero intero |
| colonna1 | principal.asset.hostname | Valore della colonna 1 se ent == "SERVICE ALERT", altrimenti della colonna 2 se ent == "SERVICE NOTIFICATION" |
| colonna1 | principal.hostname | Valore della colonna 1 se ent == "SERVICE ALERT", altrimenti della colonna 2 se ent == "SERVICE NOTIFICATION" |
| colonna1 | principal.user.user_display_name | Il valore viene copiato direttamente se ent == "SERVICE NOTIFICATION" |
| colonna2 | security_result.summary | Valore della colonna 3 se ent == "SERVICE ALERT" e la colonna 2 non è vuota, altrimenti dalla colonna 3 se ent == "SERVICE NOTIFICATION" |
| colonna3 | security_result.summary | Valore della colonna 3 se ent == "SERVICE ALERT" e la colonna 3 non è vuota, altrimenti dalla colonna 3 se ent == "SERVICE NOTIFICATION" |
| column4 | security_result.severity | Valore copiato direttamente se column4 in ["LOW", "MEDIUM", "HIGH", "CRITICAL"] e ent in ["SERVICE NOTIFICATION", "SERVICE ALERT"] |
| column4 | security_result.severity_details | Valore copiato direttamente se column4 non è in ["LOW", "MEDIUM", "HIGH", "CRITICAL"] ed ent in ["SERVICE NOTIFICATION", "SERVICE ALERT"] |
| has_principal | metadata.event_type | Imposta "NETWORK_CONNECTION" se has_principal e has_target sono true, altrimenti "STATUS_UPDATE" se has_principal è true, altrimenti "USER_UNCATEGORIZED" se has_principal_user è true, altrimenti "GENERIC_EVENT" |
| has_target | metadata.event_type | |
| has_principal_user | metadata.event_type | |
| security_result | event.idm.read_only_udm.security_result | Unito da security_result |
| metadati | event.idm.read_only_udm.metadata | Rinominato dai metadati |
| target | event.idm.read_only_udm.target | Rinominato dal target |
| entità | event.idm.read_only_udm.principal | Rinominato dall'entità |
| rete | event.idm.read_only_udm.network | Rinominato dalla rete |
| metadata.product_name | metadata.product_name | Imposta su "NAGIOS" |
| metadata.vendor_name | metadata.vendor_name | Imposta su "NAGIOS" |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.