Merge pull request #40 from corley/feature/inline-protocol

Optimized inline protocol conversion
This commit is contained in:
Walter Dal Mut
2015-06-27 14:26:40 +02:00
11 changed files with 177 additions and 104 deletions
+3 -3
View File
@@ -284,9 +284,9 @@ The impact of message to inline protocol conversion is:
Corley\Benchmarks\InfluxDB\MessageToInlineProtocolEvent
Method Name Iterations Average Time Ops/second
---------------------------------------------------- ------------ -------------- -------------
convertMessageToInlineProtocolWithNoTags : [10,000 ] [0.0000237422466] [42,119.01324]
convertMessageToInlineProtocolWithGlobalTags : [10,000 ] [0.0000306700468] [32,605.10185]
convertMessageToInlineProtocolWithDifferentTagLevels: [10,000 ] [0.0000343942404] [29,074.63543]
convertMessageToInlineProtocolWithNoTags : [10,000 ] [0.0000343696594] [29,095.42942]
convertMessageToInlineProtocolWithGlobalTags : [10,000 ] [0.0000437165260] [22,874.64469]
convertMessageToInlineProtocolWithDifferentTagLevels: [10,000 ] [0.0000493728638] [20,254.04086]
```
## FAQ
@@ -23,6 +23,8 @@ class AdapterEvent extends AthleticEvent
$options->setDatabase("tcp.test");
$client = new Client(new GuzzleAdapter(new HttpClient(), $options));
$client->createDatabase("tcp.test");
$client->createDatabase("udp.test");
$this->httpClient = $client;
@@ -2,31 +2,15 @@
namespace Corley\Benchmarks\InfluxDB;
use Athletic\AthleticEvent;
use InfluxDB\Adapter\UdpAdapter;
use InfluxDB\Options;
class MessageToInlineProtocolEvent extends AthleticEvent
{
private $method;
private $object;
public function setUp()
{
$object = new UdpAdapter(new Options());
$reflection = new \ReflectionClass(get_class($object));
$method = $reflection->getMethod("serialize");
$method->setAccessible(true);
$this->method = $method;
$this->object = $object;
}
/**
* @iterations 10000
*/
public function convertMessageToInlineProtocolWithNoTags()
{
$this->method->invokeArgs($this->object, [
\InfluxDB\Adapter\message_to_inline_protocol(
[
"points" => [
[
@@ -38,7 +22,7 @@ class MessageToInlineProtocolEvent extends AthleticEvent
],
]
]
]);
);
}
/**
@@ -46,7 +30,7 @@ class MessageToInlineProtocolEvent extends AthleticEvent
*/
public function convertMessageToInlineProtocolWithGlobalTags()
{
$this->method->invokeArgs($this->object, [
\InfluxDB\Adapter\message_to_inline_protocol(
[
"tags" => [
"dc" => "eu-west-1",
@@ -61,7 +45,7 @@ class MessageToInlineProtocolEvent extends AthleticEvent
],
]
]
]);
);
}
/**
@@ -69,7 +53,7 @@ class MessageToInlineProtocolEvent extends AthleticEvent
*/
public function convertMessageToInlineProtocolWithDifferentTagLevels()
{
$this->method->invokeArgs($this->object, [
\InfluxDB\Adapter\message_to_inline_protocol(
[
"tags" => [
"dc" => "eu-west-1",
@@ -87,6 +71,6 @@ class MessageToInlineProtocolEvent extends AthleticEvent
],
]
]
]);
);
}
}
+2 -1
View File
@@ -35,7 +35,8 @@
"psr-4": {
"InfluxDB\\": ["./src/"],
"Corley\\": ["./benchmarks/"]
}
},
"files": ["src/Adapter/helpers.php"]
},
"autoload-dev": {
"psr-4": {
Generated
+1 -1
View File
@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file",
"This file is @generated automatically"
],
"hash": "ea197cdcb24cfde2b668ae1750adbfc0",
"hash": "6378134d2378fee11d7741fb4343074d",
"packages": [
{
"name": "guzzlehttp/guzzle",
+8 -60
View File
@@ -8,7 +8,12 @@ class UdpAdapter extends AdapterAbstract
public function send(array $message)
{
$message = array_replace_recursive($this->getMessageDefaults(), $message);
$message = $this->serialize($message);
if (array_key_exists("tags", $message)) {
$message["tags"] = array_replace_recursive($this->getOptions()->getTags(), $message["tags"]);
}
$message = message_to_inline_protocol($message);
$this->write($message);
}
@@ -17,8 +22,8 @@ class UdpAdapter extends AdapterAbstract
{
// Create a handler in order to handle the 'Host is down' message
set_error_handler(function() {
// Suppress the error, this is the UDP adapter and if we can't send
// it then we shouldn't inturrupt their application.
// Suppress the error, this is the UDP adapter and if we can't send
// it then we shouldn't inturrupt their application.
});
$socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
@@ -28,61 +33,4 @@ class UdpAdapter extends AdapterAbstract
// Remove our error handler.
restore_error_handler();
}
private function serialize(array $message)
{
$tags = $this->getOptions()->getTags();
if (array_key_exists("tags", $message)) {
$tags = array_replace_recursive($tags, $message["tags"]);
}
$unixepoch = $this->generateTimeInNanoSeconds();
if (array_key_exists("time", $message)) {
$dt = new DateTime($message["time"]);
$unixepoch = (int)($dt->format("U") * 1e9);
}
$lines = [];
foreach ($message["points"] as $point) {
if (array_key_exists("tags", $point)) {
$tags = array_replace_recursive($tags, $point["tags"]);
}
if (!$tags) {
$lines[] = trim(
sprintf(
"%s %s %d",
$point["measurement"], $this->toKeyValue($point["fields"], true), $unixepoch
)
);
} else {
$lines[] = trim(
sprintf(
"%s,%s %s %d",
$point["measurement"], $this->toKeyValue($tags), $this->toKeyValue($point["fields"], true), $unixepoch
)
);
}
}
return implode("\n", $lines);
}
protected function generateTimeInNanoSeconds()
{
return (int)(microtime(true) * 1e9);
}
protected function toKeyValue(array $elems, $escape = false)
{
$list = [];
foreach ($elems as $key => $value) {
if ($escape && is_string($value)) {
$value = "\"{$value}\"";
}
$list[] = sprintf("%s=%s", $key, $value);
}
return implode(",", $list);
}
}
+52
View File
@@ -0,0 +1,52 @@
<?php
namespace InfluxDB\Adapter;
use DateTime;
function message_to_inline_protocol(array $message)
{
$unixepoch = (int)(microtime(true) * 1e9);
if (array_key_exists("time", $message)) {
$dt = new DateTime($message["time"]);
$unixepoch = (int)($dt->format("U") * 1e9);
}
$lines = [];
foreach ($message["points"] as $point) {
$tags = array_key_exists("tags", $message) ? $message["tags"] : [];
if (array_key_exists("tags", $point)) {
$tags = array_replace_recursive($tags, $point["tags"]);
}
if (!$tags) {
$lines[] = trim(
sprintf(
"%s %s %d",
$point["measurement"], list_to_string($point["fields"], true), $unixepoch
)
);
} else {
$lines[] = trim(
sprintf(
"%s,%s %s %d",
$point["measurement"], list_to_string($tags), list_to_string($point["fields"], true), $unixepoch
)
);
}
}
return implode("\n", $lines);
}
function list_to_string(array $elements, $escape = false)
{
array_walk($elements, function(&$value, $key) use ($escape) {
if ($escape && is_string($value)) {
$value = "\"{$value}\"";
}
$value = "{$key}={$value}";
});
return implode(",", $elements);
}
+15
View File
@@ -0,0 +1,15 @@
<?php
namespace InfluxDB;
function list_to_string(array $elements, $escape = false)
{
array_walk($elements, function(&$value, $key) use ($escape) {
if ($escape && is_string($value)) {
$value = "\"{$value}\"";
}
$value = "{$key}={$value}";
});
return implode(",", $elements);
}
@@ -39,6 +39,7 @@ class UdpAdapterTest extends InfluxDBTestCase
"measurement" => "mem",
"fields" => [
"value" => 1233,
"with_string" => "this is a string",
],
],
],
@@ -49,5 +50,6 @@ class UdpAdapterTest extends InfluxDBTestCase
$this->assertSerieExists("udp.test", "mem");
$this->assertSerieCount("udp.test", "mem", 1);
$this->assertValueExistsInSerie("udp.test", "mem", "value", 1233);
$this->assertValueExistsInSerie("udp.test", "mem", "with_string", "this is a string");
}
}
+23
View File
@@ -0,0 +1,23 @@
<?php
namespace InfluxDB\Adapter;
class HelpersTest extends \PHPUnit_Framework_TestCase
{
/**
* @dataProvider getElements
*/
public function testListToInlineValues($message, $result, $escape)
{
$this->assertEquals($result, list_to_string($message, $escape));
}
public function getElements()
{
return [
[["one" => "two"], "one=two", false],
[["one" => "two"], "one=\"two\"", true],
[["one" => "two", "three" => "four"], "one=two,three=four", false],
[["one" => "two", "three" => "four"], "one=\"two\",three=\"four\"", true],
];
}
}
+63 -17
View File
@@ -17,13 +17,15 @@ class UdpAdapterTest extends \PHPUnit_Framework_TestCase
public function testRewriteMessages($input, $response)
{
$object = new UdpAdapter(new Options());
$reflection = new \ReflectionClass(get_class($object));
$method = $reflection->getMethod("serialize");
$method->setAccessible(true);
$object = $this->getMockBuilder("InfluxDB\Adapter\UdpAdapter")
->setConstructorArgs([new Options()])
->setMethods(["write"])
->getMock();
$object->expects($this->once())
->method("write")
->with($response);
$message = $method->invokeArgs($object, [$input]);
$this->assertEquals($response, $message);
$object->send($input);
}
public function getMessages()
@@ -43,6 +45,21 @@ class UdpAdapterTest extends \PHPUnit_Framework_TestCase
],
"cpu value=1 1257894000000000000"
],
[
[
"time" => "2009-11-10T23:00:00Z",
"points" => [
[
"measurement" => "cpu",
"fields" => [
"value" => 1,
"string" => "escape",
],
],
],
],
"cpu value=1,string=\"escape\" 1257894000000000000"
],
[
[
"tags" => [
@@ -115,7 +132,7 @@ EOF
$adapter->expects($this->once())
->method("write")
->with("udp.test mark=\"element\" 1245");
->with($this->matchesRegularExpression("/udp.test mark=\"element\" \d+/i"));
$adapter->send([
"points" => [
@@ -146,11 +163,11 @@ EOF
$adapter->expects($this->once())
->method("write")
->with(<<<EOF
mem free=712423 1245
cpu cpu=18.12 1245
->with($this->matchesRegularExpression(<<<EOF
/mem free=712423 \d+
cpu cpu=18.12 \d+/i
EOF
);
));
$adapter->send([
"points" => [
@@ -170,6 +187,35 @@ EOF
]);
}
/**
* @group udp
*/
public function testFillWithGlobalTags()
{
$options = (new Options())
->setDatabase("test")
->setTags(["dc" => "eu-west"]);
$adapter = $this->getMockBuilder("InfluxDB\\Adapter\\UdpAdapter")
->setConstructorArgs([$options])
->setMethods(["write"])
->getMock();
$adapter->expects($this->once())
->method("write")
->with($this->matchesRegularExpression("/mem,dc=eu-west free=712423 \d+/i"));
$adapter->send([
"points" => [
[
"measurement" => "mem",
"fields" => [
"free" => 712423,
],
],
]
]);
}
/**
* @group udp
*/
@@ -189,10 +235,10 @@ EOF
$adapter->expects($this->once())
->method("write")
->with(<<<EOF
mem,dc=eu-west,region=eu-west-1 free=712423 1245
->with($this->matchesRegularExpression(<<<EOF
/mem,dc=eu-west,region=eu-west-1 free=712423 \d+/i
EOF
);
));
$adapter->send([
"tags" => [
@@ -228,10 +274,10 @@ EOF
$adapter->expects($this->once())
->method("write")
->with(<<<EOF
mem,dc=eu-west,region=eu-west-1,location=ireland free=712423 1245
->with($this->matchesRegularExpression(<<<EOF
/mem,dc=eu-west,region=eu-west-1,location=ireland free=712423 \d+/i
EOF
);
));
$adapter->send([
"tags" => [