Feed events to Kafka Connect
Mosaic enables you to feed events into your SIEM (Security information and event management) and log collection systems. To provide better control over the granularity of data attributes sent to downstream systems, Mosaic allows creating event streams, including multiple streams per event type, and collecting events separately. To learn more, see About event streaming.
To feed data, Mosaic uses Confluent HTTP Source Connector to fetch data from Mosaic Event streaming API endpoint and write it to a Kafka topic.
How it works
The connector polls the specified HTTP endpoint at regular intervals and writes the response to a Kafka topic. Learn more at HTTP Source Connector.
Step 1: Configure a management app
In your Mosaic tenant, configure a management app. Give the app a suitable name, for example, KafkaLogStream
.
Note
After saving the management app, open it again and note the Client ID and Client Secret values. You’ll need these parameters to configure the Connector.
Step 2: Create event streams
Before you can start feeding events to Kafka Connect, you have to enable data collection and create event streams in Mosaic.
- Enable event collection. Specify the type of events you want to collect.
import fetch from 'node-fetch';
async function run() {
const query = new URLSearchParams({
type: '<TYPE>', // Event type. One of cis, admin, risk, verify
}).toString();
const resp = await fetch(
`https://api.transmitsecurity.io/activities/v1/activities/start-collect?${query}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer <YOUR_TOKEN_HERE>' // Client access token
}
}
);
const data = await resp.text();
console.log(data);
}
run();
- Create an event stream. Make sure to provide an ID to identify the stream. The stream ID should be a continuous string, without spaces, and unique for each stream.
import fetch from 'node-fetch';
async function run() {
const query = new URLSearchParams({
type: '<TYPE>', // Event type. One of cis, admin, risk, verify
stream_id: 'string' // Unique stream ID, without spaces
}).toString();
const resp = await fetch(
`https://api.transmitsecurity.io/activities/v1/activities/stream?${query}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer <YOUR_TOKEN_HERE>' // Client access token
}
}
);
const data = await resp.text();
console.log(data);
}
run();
Step 3: Configure the Connector
The Connector has to be configured to obtain events from Collect events endpoint. Update the activities-to-kafka-connector.json
configuration file as described below:
Setting | Description |
---|---|
URL Configuration | The URL points to the Mosaic Event streaming API endpoint: "url": "https://api.transmitsecurity.io/activities/v1/activities/collect?type=<type>&stream_id=<stream_id>" Provide parameters:
|
Authentication | The connector uses OAuth2 authentication. More details: HTTP Source Connector Authentication configuration. Provide client ID and client secret of the Mosaic management app (see Step 1).
|
Additional settings |
|
Step 4: Running the Connector using Docker
To connect to your existing Kafka deployment, run the Connector using Docker. Use the confluentinc/kafka-connect-http-source:latest
image.
-
Install the http-source plugin:
confluent-hub install --no-prompt confluentinc/kafka-connect-http-source:latest && confluent-hub install --no-prompt confluentinc/kafka-connect-http:latest &&
-
Launch the Kafka Connect worker:
/etc/confluent/docker/run &
-
Wait for initialization and then install the
activities-to-kafka-connector
:curl -s -X POST -H "Content-Type: application/json" --data @/tmp/activities-to-kafka-connector.json http://localhost:8083/connectors &&
Note
You should be able to receive data at this stage.
Step 5: Test the Connector
In the attached docker-compose.yml
file, you can see a full example for local testing:
- the activities-connect docker
- running kafka and zookeeper
- kafka-consumer for printing the data
Before running, make sure to update the following values:
-
"url": "\<EVENT\_STREAM\_URL\>"
-
"oauth2.client.id": "\<CLIENT\_ID\>"
, -
"oauth2.client.secret": "\<CLIENT\_SECRET\>"
To check the status of the connector, run the following docker:
docker exec activities-connect curl http://localhost:8083/connectors/activities-connector/status