处理监控数据

dev
Jing Li 2022-10-20 16:18:38 +08:00
parent 238cccda58
commit 98a7ada0b0
11 changed files with 292 additions and 138 deletions

View File

@ -5,11 +5,8 @@ namespace App\Console\Commands;
use App\Enums\DeviceType; use App\Enums\DeviceType;
use App\Models\Device; use App\Models\Device;
use App\Models\LinkosDeviceLog; use App\Models\LinkosDeviceLog;
use App\Models\MeteorologicalMonitoringLog; use App\Services\LinkosDeviceLogService;
use App\Models\SoilMonitoringLog;
use App\Models\WaterQualityMonitoringLog;
use Illuminate\Console\Command; use Illuminate\Console\Command;
use Illuminate\Support\Arr;
class LinkosDeviceLogArchiveCommand extends Command class LinkosDeviceLogArchiveCommand extends Command
{ {
@ -30,14 +27,15 @@ class LinkosDeviceLogArchiveCommand extends Command
/** /**
* Execute the console command. * Execute the console command.
* *
* @param \App\Services\LinkosDeviceLogService $linkosDeviceLogService
* @return int * @return int
*/ */
public function handle() public function handle(LinkosDeviceLogService $linkosDeviceLogService)
{ {
$devices = Device::whereIn('type', [DeviceType::Meteorological, DeviceType::Soil, DeviceType::WaterQuality])->get(); $devices = Device::whereIn('type', [DeviceType::Meteorological, DeviceType::Soil, DeviceType::WaterQuality])->get();
// 物联平台目前只有水质监测设备和气象监测设备 // 物联平台目前只有水质监测设备和气象监测设备
LinkosDeviceLog::orderBy('reported_at', 'asc')->lazy()->each(function ($log) use ($devices) { LinkosDeviceLog::orderBy('reported_at', 'asc')->lazy()->each(function ($log) use ($devices, $linkosDeviceLogService) {
if (empty($log->data)) { if (empty($log->data)) {
return; return;
} }
@ -48,122 +46,13 @@ class LinkosDeviceLogArchiveCommand extends Command
} }
match ($device->type) { match ($device->type) {
DeviceType::Soil => $this->handleSoildDeviceLog($device, $log), DeviceType::Soil => $linkosDeviceLogService->handleSoilMonitoringLog($device, $log->data, $log->reported_at),
DeviceType::WaterQuality => $this->handleWaterQualityDeviceLog($device, $log), DeviceType::WaterQuality => $linkosDeviceLogService->handleWaterQualityMonitoringLog($device, $log->data, $log->reported_at),
DeviceType::Meteorological => $this->handleMeteorologicalDeviceLog($device, $log), DeviceType::Meteorological => $linkosDeviceLogService->handleMeteorologicalMonitoringLog($device, $log->data, $log->reported_at),
}; };
} }
}); });
return Command::SUCCESS; return Command::SUCCESS;
} }
protected function handleSoildDeviceLog(Device $device, LinkosDeviceLog $log)
{
$attributes = [
'conductivity' => 'conductivity',
'soil_humidity' => 'humidity',
'soil_temperature' => 'temperature',
'nitrogen_content' => 'n',
'phosphorus_content' => 'p',
'potassium_content' => 'k',
];
if (! Arr::hasAny($log->data, array_keys($attributes))) {
return;
}
$monitoringLog = SoilMonitoringLog::firstOrCreate([
'device_id' => $device->id,
'monitored_at' => $log->reported_at->startOfHour(),
], [
'agricultural_base_id' => $device->agricultural_base_id,
]);
foreach ($attributes as $key => $attribute) {
if (! array_key_exists($key, $log->data)) {
continue;
}
$monitoringLog->{$attribute} = $log->data[$key];
}
$monitoringLog->save();
}
protected function handleWaterQualityDeviceLog(Device $device, LinkosDeviceLog $log)
{
$attributes = [
'conductivity' => 'conductivity',
'oxygen' => 'oxygen',
'chlorine' => 'chlorine',
'turbidity' => 'turbidity',
'temp' => 'temperature',
'ph' => 'ph',
];
if (! Arr::hasAny($log->data, array_keys($attributes))) {
return;
}
$monitoringLog = WaterQualityMonitoringLog::firstOrCreate([
'device_id' => $device->id,
'monitored_at' => $log->reported_at->startOfHour(),
], [
'agricultural_base_id' => $device->agricultural_base_id,
]);
foreach ($attributes as $key => $attribute) {
if (! array_key_exists($key, $log->data)) {
continue;
}
$monitoringLog->{$attribute} = $log->data[$key];
}
$monitoringLog->save();
}
protected function handleMeteorologicalDeviceLog(Device $device, LinkosDeviceLog $log)
{
$attributes = [
'wind_speed' => 'wind_speed',
'wind_power' => 'wind_power',
'wind_direction' => 'wind_direction',
'wind_degree' => 'wind_degree',
'box_humidity' => 'air_humidity',
'box_temperature' => 'air_temperature',
'box_pressure' => 'air_pressure',
'box_carbon' => 'co2',
'box_noise' => 'noise',
'box_illumination' => 'illumination',
'accumulate_rainfall' => 'accumulated_rainfall',
'current_rainfall' => 'current_rainfall',
'moment_rainfall' => 'moment_rainfall',
'day_rainfall' => 'day_rainfall',
'pm25_concentration' => 'pm25',
'pm10_concentration' => 'pm10',
];
if (! Arr::hasAny($log->data, array_keys($attributes))) {
return;
}
$monitoringLog = MeteorologicalMonitoringLog::firstOrCreate([
'device_id' => $device->id,
'monitored_at' => $log->reported_at->startOfHour(),
], [
'agricultural_base_id' => $device->agricultural_base_id,
]);
foreach ($attributes as $key => $attribute) {
if (! array_key_exists($key, $log->data)) {
continue;
}
$monitoringLog->{$attribute} = $log->data[$key];
}
$monitoringLog->save();
}
} }

View File

@ -46,7 +46,7 @@ class LinkosDeviceLogSyncCommand extends Command
do { do {
if ($lastDate === null) { if ($lastDate === null) {
$lastDate = Carbon::parse('2022-10-18'); $lastDate = Carbon::parse('2022-06-01');
} else { } else {
$lastDate->addDay(); $lastDate->addDay();
} }

View File

@ -2,15 +2,95 @@
namespace App\Http\Controllers\Callback; namespace App\Http\Controllers\Callback;
use App\Enums\DeviceStatus;
use App\Http\Controllers\Controller; use App\Http\Controllers\Controller;
use App\Models\Device;
use App\Services\LinkosDeviceLogService;
use Carbon\Carbon;
use Illuminate\Http\Request; use Illuminate\Http\Request;
use Illuminate\Support\Arr;
class LinkosController extends Controller class LinkosController extends Controller
{ {
public function __invoke(Request $request) public function __invoke(Request $request)
{ {
logger()->debug('linkos callback data', $request->input()); $input = $request->input();
return response()->json(); if (isset($input['notify_type'])) {
// 设备上线、离线通知
if ($input['notify_type'] === 'online_state_change') {
$this->handleOnlineStateChangeNotify($input);
}
} elseif (Arr::has($input, ['device_id', 'device_unit', 'device_category'])) {
$type = (int) ($input['type'] ?? 0);
switch ($type) {
case 0:
$this->handleDeviceDataNotify($input);
break;
}
}
return response()->json(['code' => 0, 'msg' => 'ok']);
}
/**
* 处理设备采集数据
*
* @param array $data
* @return void
*/
protected function handleDeviceDataNotify(array $data)
{
if (! is_array($deviceData = $data['data'] ?? [])) {
$deviceData = [];
}
$reportedAt = isset($data['timestamp'])
? Carbon::createFromTimestampMs($data['timestamp'])
: now();
if (! is_array($deviceData = $data['data'] ?? [])) {
$deviceData = [];
}
(new LinkosDeviceLogService())->create(
$data['device_id'],
$data['device_unit'],
$data['device_category'],
$deviceData,
$reportedAt,
);
}
/**
* 处理设备离线、上线通知
*
* @param array $data
* @return void
*/
protected function handleOnlineStateChangeNotify(array $data)
{
$items = $data['data'] ?? [];
if (! is_array($items)) {
return;
}
foreach ($items as $item) {
if (! Arr::has($item, ['device_id', 'online_state'])) {
continue;
}
switch ($item['online_state']) {
case 0:
Device::where('sn', $item['device_id'])->update(['status' => DeviceStatus::Offline]);
break;
case 1:
Device::where('sn', $item['device_id'])->update(['status' => DeviceStatus::Online]);
break;
}
}
} }
} }

View File

@ -12,6 +12,6 @@ class VerifyCsrfToken extends Middleware
* @var array<int, string> * @var array<int, string>
*/ */
protected $except = [ protected $except = [
'callback/*' 'callback/*',
]; ];
} }

View File

@ -6,7 +6,7 @@ use App\Enums\WindDirection;
use Illuminate\Database\Eloquent\Factories\HasFactory; use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model; use Illuminate\Database\Eloquent\Model;
class MeteorologicalMonitoringLog extends Model class MeteorologicalMonitoringHourlyLog extends Model
{ {
use HasFactory; use HasFactory;

View File

@ -5,7 +5,7 @@ namespace App\Models;
use Illuminate\Database\Eloquent\Factories\HasFactory; use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model; use Illuminate\Database\Eloquent\Model;
class SoilMonitoringLog extends Model class SoilMonitoringHourlyLog extends Model
{ {
use HasFactory; use HasFactory;

View File

@ -5,7 +5,7 @@ namespace App\Models;
use Illuminate\Database\Eloquent\Factories\HasFactory; use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model; use Illuminate\Database\Eloquent\Model;
class WaterQualityMonitoringLog extends Model class WaterQualityMonitoringHourlyLog extends Model
{ {
use HasFactory; use HasFactory;

View File

@ -0,0 +1,190 @@
<?php
namespace App\Services;
use App\Enums\DeviceType;
use App\Exceptions\BizException;
use App\Models\Device;
use App\Models\LinkosDeviceLog;
use App\Models\MeteorologicalMonitoringHourlyLog;
use App\Models\SoilMonitoringHourlyLog;
use App\Models\WaterQualityMonitoringHourlyLog;
use Carbon\Carbon;
use Illuminate\Support\Arr;
class LinkosDeviceLogService
{
/**
* 创建 Linkos 设备数据
*
* @param string $deviceId
* @param string $deviceUnit
* @param string $deviceCategory
* @param array $data
* @param \Carbon\Carbon $reportedAt
* @return \App\Models\LinkosDeviceLog
*
* @throws \App\Exceptions\BizException
*/
public function create(string $deviceId, string $deviceUnit, string $deviceCategory, array $data, Carbon $reportedAt): LinkosDeviceLog
{
$devices = Device::where('sn', $deviceId)->get();
if ($devices->isEmpty()) {
throw new BizException("设备未找到, 设备编号: {$deviceId}");
}
$log = LinkosDeviceLog::create([
'device_id' => $deviceId,
'device_unit' => $deviceUnit,
'device_category' => $deviceCategory,
'data' => $data,
'reported_at' => $reportedAt,
]);
if ($log->data) {
foreach ($devices as $device) {
match ($device->type) {
DeviceType::Soil => $this->handleSoilMonitoringLog($device, $log->data, $log->reported_at),
DeviceType::WaterQuality => $this->handleWaterQualityMonitoringLog($device, $log->data, $log->reported_at),
DeviceType::Meteorological => $this->handleMeteorologicalMonitoringLog($device, $log->data, $log->reported_at),
};
}
}
return $log;
}
/**
* 处理气象监控数据
*
* @param \App\Models\Device $device
* @param array $data
* @param \Carbon\Carbon $monitoredAt
* @return void
*/
public function handleMeteorologicalMonitoringLog(Device $device, array $data, Carbon $monitoredAt): void
{
$attributes = [
'wind_speed' => 'wind_speed',
'wind_power' => 'wind_power',
'wind_direction' => 'wind_direction',
'wind_degree' => 'wind_degree',
'air_humidity' => 'box_humidity',
'air_temperature' => 'box_temperature',
'air_pressure' => 'box_pressure',
'co2' => 'box_carbon',
'noise' => 'box_noise',
'illumination' => 'box_illumination',
'accumulated_rainfall' => 'accumulate_rainfall',
'current_rainfall' => 'current_rainfall',
'moment_rainfall' => 'moment_rainfall',
'day_rainfall' => 'day_rainfall',
'pm25' => 'pm25_concentration',
'pm10' => 'pm10_concentration',
];
if (! Arr::hasAny($data, $attributes)) {
return;
}
$hourlyLog = MeteorologicalMonitoringHourlyLog::firstOrCreate([
'device_id' => $device->id,
'monitored_at' => $monitoredAt->startOfHour(),
], [
'agricultural_base_id' => $device->agricultural_base_id,
]);
foreach ($attributes as $key => $value) {
if (! array_key_exists($value, $data)) {
continue;
}
$hourlyLog->{$key} = $data[$value];
}
$hourlyLog->save();
}
/**
* 处理水质监控数据
*
* @param \App\Models\Device $device
* @param array $data
* @param \Carbon\Carbon $monitoredAt
* @return void
*/
public function handleWaterQualityMonitoringLog(Device $device, array $data, Carbon $monitoredAt): void
{
$attributes = [
'conductivity' => 'conductivity',
'oxygen' => 'oxygen',
'chlorine' => 'chlorine',
'turbidity' => 'turbidity',
'temperature' => 'temp',
'ph' => 'ph',
];
if (! Arr::hasAny($data, $attributes)) {
return;
}
$hourlyLog = WaterQualityMonitoringHourlyLog::firstOrCreate([
'device_id' => $device->id,
'monitored_at' => $monitoredAt->startOfHour(),
], [
'agricultural_base_id' => $device->agricultural_base_id,
]);
foreach ($attributes as $key => $value) {
if (! array_key_exists($value, $data)) {
continue;
}
$hourlyLog->{$key} = $data[$value];
}
$hourlyLog->save();
}
/**
* 处理土壤监控数据
*
* @param \App\Models\Device $device
* @param array $data
* @param \Carbon\Carbon $monitoredAt
* @return void
*/
public function handleSoilMonitoringLog(Device $device, array $data, Carbon $monitoredAt): void
{
$attributes = [
'conductivity' => 'conductivity',
'humidity' => 'soil_humidity',
'temperature' => 'soil_temperature',
'n' => 'nitrogen_content',
'p' => 'phosphorus_content',
'k' => 'potassium_content',
];
if (! Arr::hasAny($data, $attributes)) {
return;
}
$hourlyLog = SoilMonitoringHourlyLog::firstOrCreate([
'device_id' => $device->id,
'monitored_at' => $monitoredAt->startOfHour(),
], [
'agricultural_base_id' => $device->agricultural_base_id,
]);
foreach ($attributes as $key => $value) {
if (! array_key_exists($value, $data)) {
continue;
}
$hourlyLog->{$key} = $data[$value];
}
$hourlyLog->save();
}
}

View File

@ -13,7 +13,7 @@ return new class extends Migration
*/ */
public function up() public function up()
{ {
Schema::create('soil_monitoring_logs', function (Blueprint $table) { Schema::create('soil_monitoring_hourly_logs', function (Blueprint $table) {
$table->id(); $table->id();
$table->unsignedBigInteger('device_id')->comment('设备ID'); $table->unsignedBigInteger('device_id')->comment('设备ID');
$table->unsignedBigInteger('agricultural_base_id')->comment('农业基地ID'); $table->unsignedBigInteger('agricultural_base_id')->comment('农业基地ID');
@ -23,10 +23,9 @@ return new class extends Migration
$table->integer('n')->nullable()->comment('氮 (单位: mg/kg)'); $table->integer('n')->nullable()->comment('氮 (单位: mg/kg)');
$table->integer('p')->nullable()->comment('磷 (单位: mg/kg)'); $table->integer('p')->nullable()->comment('磷 (单位: mg/kg)');
$table->integer('k')->nullable()->comment('钾 (单位: mg/kg)'); $table->integer('k')->nullable()->comment('钾 (单位: mg/kg)');
$table->timestamp('monitored_at')->comment('监控时间(小时)'); $table->timestamp('monitored_at')->comment('监控时间(小时), 示例: 2022-10-20 01:00:00');
$table->timestamps(); $table->timestamps();
// 索引
$table->index('agricultural_base_id'); $table->index('agricultural_base_id');
$table->unique(['device_id', 'monitored_at']); $table->unique(['device_id', 'monitored_at']);
}); });
@ -39,6 +38,6 @@ return new class extends Migration
*/ */
public function down() public function down()
{ {
Schema::dropIfExists('soil_monitoring_logs'); Schema::dropIfExists('soil_monitoring_hourly_logs');
} }
}; };

View File

@ -13,12 +13,12 @@ return new class extends Migration
*/ */
public function up() public function up()
{ {
Schema::create('meteorological_monitoring_logs', function (Blueprint $table) { Schema::create('meteorological_monitoring_hourly_logs', function (Blueprint $table) {
$table->id(); $table->id();
$table->unsignedBigInteger('device_id')->comment('设备ID'); $table->unsignedBigInteger('device_id')->comment('设备ID');
$table->unsignedBigInteger('agricultural_base_id')->comment('农业基地ID'); $table->unsignedBigInteger('agricultural_base_id')->comment('农业基地ID');
$table->decimal('wind_speed', 8, 2)->nullable()->comment('风速 (单位: m/s)'); $table->decimal('wind_speed', 8, 2)->nullable()->comment('风速 (单位: m/s)');
$table->integer('wind_power')->nullable()->comment('风力'); $table->tinyInteger('wind_power')->nullable()->comment('风力');
$table->tinyInteger('wind_direction')->nullable()->comment('风向: 0 北风, 1 东北风, 2 东风, 3 东南风, 4 南风, 5 西南风, 6 西风, 7 西北风'); $table->tinyInteger('wind_direction')->nullable()->comment('风向: 0 北风, 1 东北风, 2 东风, 3 东南风, 4 南风, 5 西南风, 6 西风, 7 西北风');
$table->integer('wind_degree')->nullable()->comment('风向度数'); $table->integer('wind_degree')->nullable()->comment('风向度数');
$table->decimal('air_humidity', 8, 2)->nullable()->comment('空气湿度 (单位: db)'); $table->decimal('air_humidity', 8, 2)->nullable()->comment('空气湿度 (单位: db)');
@ -35,10 +35,6 @@ return new class extends Migration
$table->integer('pm10')->nullable()->comment('PM10浓度 (单位: ug/m3)'); $table->integer('pm10')->nullable()->comment('PM10浓度 (单位: ug/m3)');
$table->timestamp('monitored_at')->comment('监控时间(小时)'); $table->timestamp('monitored_at')->comment('监控时间(小时)');
$table->timestamps(); $table->timestamps();
// 索引
$table->index('agricultural_base_id');
$table->unique(['device_id', 'monitored_at']);
}); });
} }
@ -49,6 +45,6 @@ return new class extends Migration
*/ */
public function down() public function down()
{ {
Schema::dropIfExists('meteorological_monitoring_logs'); Schema::dropIfExists('meteorological_monitoring_hourly_logs');
} }
}; };

View File

@ -13,7 +13,7 @@ return new class extends Migration
*/ */
public function up() public function up()
{ {
Schema::create('water_quality_monitoring_logs', function (Blueprint $table) { Schema::create('water_quality_monitoring_hourly_logs', function (Blueprint $table) {
$table->id(); $table->id();
$table->unsignedBigInteger('device_id')->comment('设备ID'); $table->unsignedBigInteger('device_id')->comment('设备ID');
$table->unsignedBigInteger('agricultural_base_id')->comment('农业基地ID'); $table->unsignedBigInteger('agricultural_base_id')->comment('农业基地ID');
@ -38,6 +38,6 @@ return new class extends Migration
*/ */
public function down() public function down()
{ {
Schema::dropIfExists('water_quality_monitoring_logs'); Schema::dropIfExists('water_quality_monitoring_hourly_logs');
} }
}; };